summaryrefslogtreecommitdiff
path: root/s
diff options
context:
space:
mode:
Diffstat (limited to 's')
-rw-r--r--s/chunk.cpp628
-rw-r--r--s/chunk.h221
-rw-r--r--s/commands_admin.cpp696
-rw-r--r--s/commands_public.cpp368
-rw-r--r--s/config.cpp535
-rw-r--r--s/config.h195
-rw-r--r--s/cursors.cpp104
-rw-r--r--s/cursors.h56
-rw-r--r--s/d_logic.cpp542
-rw-r--r--s/d_logic.h23
-rw-r--r--s/d_util.cpp36
-rw-r--r--s/dbgrid.vcproj660
-rw-r--r--s/dbgrid.vcxproj201
-rw-r--r--s/request.cpp175
-rw-r--r--s/request.h120
-rw-r--r--s/s_only.cpp29
-rw-r--r--s/server.cpp202
-rw-r--r--s/server.h30
-rw-r--r--s/shardkey.cpp320
-rw-r--r--s/shardkey.h128
-rw-r--r--s/strategy.cpp221
-rw-r--r--s/strategy.h36
-rw-r--r--s/strategy_shard.cpp260
-rw-r--r--s/strategy_single.cpp131
-rw-r--r--s/util.h51
25 files changed, 5968 insertions, 0 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp
new file mode 100644
index 0000000..47c13e8
--- /dev/null
+++ b/s/chunk.cpp
@@ -0,0 +1,628 @@
+// shard.cpp
+
+/**
+ * Copyright (C) 2008 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "stdafx.h"
+#include "chunk.h"
+#include "config.h"
+#include "../util/unittest.h"
+#include "../client/connpool.h"
+#include "cursors.h"
+#include "strategy.h"
+
+namespace mongo {
+
+ // ------- Shard --------
+
+ long Chunk::MaxChunkSize = 1024 * 1204 * 50;
+
+ Chunk::Chunk( ChunkManager * manager ) : _manager( manager ){
+ _modified = false;
+ _lastmod = 0;
+ _dataWritten = 0;
+ }
+
+ void Chunk::setShard( string s ){
+ _shard = s;
+ _markModified();
+ }
+
+ bool Chunk::contains( const BSONObj& obj ){
+ return
+ _manager->getShardKey().compare( getMin() , obj ) <= 0 &&
+ _manager->getShardKey().compare( obj , getMax() ) < 0;
+ }
+
+ BSONObj Chunk::pickSplitPoint(){
+ int sort = 0;
+
+ if ( _manager->getShardKey().globalMin().woCompare( getMin() ) == 0 ){
+ sort = 1;
+ }
+ else if ( _manager->getShardKey().globalMax().woCompare( getMax() ) == 0 ){
+ sort = -1;
+ }
+
+ if ( sort ){
+ ScopedDbConnection conn( getShard() );
+ Query q;
+ if ( sort == 1 )
+ q.sort( _manager->getShardKey().key() );
+ else {
+ BSONObj k = _manager->getShardKey().key();
+ BSONObjBuilder r;
+
+ BSONObjIterator i(k);
+ while( i.more() ) {
+ BSONElement e = i.next();
+ uassert( 10163 , "can only handle numbers here - which i think is correct" , e.isNumber() );
+ r.append( e.fieldName() , -1 * e.number() );
+ }
+
+ q.sort( r.obj() );
+ }
+ BSONObj end = conn->findOne( _ns , q );
+ conn.done();
+
+ if ( ! end.isEmpty() )
+ return _manager->getShardKey().extractKey( end );
+ }
+
+ ScopedDbConnection conn( getShard() );
+ BSONObj result;
+ if ( ! conn->runCommand( "admin" , BSON( "medianKey" << _ns
+ << "keyPattern" << _manager->getShardKey().key()
+ << "min" << getMin()
+ << "max" << getMax()
+ ) , result ) ){
+ stringstream ss;
+ ss << "medianKey command failed: " << result;
+ uassert( 10164 , ss.str() , 0 );
+ }
+ conn.done();
+
+ return result.getObjectField( "median" ).getOwned();
+ }
+
+ Chunk * Chunk::split(){
+ return split( pickSplitPoint() );
+ }
+
+ Chunk * Chunk::split( const BSONObj& m ){
+ uassert( 10165 , "can't split as shard that doesn't have a manager" , _manager );
+
+ log(1) << " before split on: " << m << "\n"
+ << "\t self : " << toString() << endl;
+
+ uassert( 10166 , "locking namespace on server failed" , lockNamespaceOnServer( getShard() , _ns ) );
+
+ Chunk * s = new Chunk( _manager );
+ s->_ns = _ns;
+ s->_shard = _shard;
+ s->setMin(m.getOwned());
+ s->setMax(_max);
+
+ s->_markModified();
+ _markModified();
+
+ _manager->_chunks.push_back( s );
+
+ setMax(m.getOwned());
+
+ log(1) << " after split:\n"
+ << "\t left : " << toString() << "\n"
+ << "\t right: "<< s->toString() << endl;
+
+
+ _manager->save();
+
+ return s;
+ }
+
+ bool Chunk::moveAndCommit( const string& to , string& errmsg ){
+ uassert( 10167 , "can't move shard to its current location!" , to != getShard() );
+
+ log() << "moving chunk ns: " << _ns << " moving chunk: " << toString() << " " << _shard << " -> " << to << endl;
+
+ string from = _shard;
+ ShardChunkVersion oldVersion = _manager->getVersion( from );
+
+ BSONObj filter;
+ {
+ BSONObjBuilder b;
+ getFilter( b );
+ filter = b.obj();
+ }
+
+ ScopedDbConnection fromconn( from );
+
+ BSONObj startRes;
+ bool worked = fromconn->runCommand( "admin" ,
+ BSON( "movechunk.start" << _ns <<
+ "from" << from <<
+ "to" << to <<
+ "filter" << filter
+ ) ,
+ startRes
+ );
+
+ if ( ! worked ){
+ errmsg = (string)"movechunk.start failed: " + startRes.toString();
+ fromconn.done();
+ return false;
+ }
+
+ // update config db
+ setShard( to );
+
+ // need to increment version # for old server
+ Chunk * randomChunkOnOldServer = _manager->findChunkOnServer( from );
+ if ( randomChunkOnOldServer )
+ randomChunkOnOldServer->_markModified();
+
+ _manager->save();
+
+ BSONObj finishRes;
+ {
+
+ ShardChunkVersion newVersion = _manager->getVersion( from );
+ if ( newVersion == 0 && oldVersion > 0 ){
+ newVersion = oldVersion;
+ newVersion++;
+ _manager->save();
+ }
+ else if ( newVersion <= oldVersion ){
+ log() << "newVersion: " << newVersion << " oldVersion: " << oldVersion << endl;
+ uassert( 10168 , "version has to be higher" , newVersion > oldVersion );
+ }
+
+ BSONObjBuilder b;
+ b << "movechunk.finish" << _ns;
+ b << "to" << to;
+ b.appendTimestamp( "newVersion" , newVersion );
+ b.append( startRes["finishToken"] );
+
+ worked = fromconn->runCommand( "admin" ,
+ b.done() ,
+ finishRes );
+ }
+
+ if ( ! worked ){
+ errmsg = (string)"movechunk.finish failed: " + finishRes.toString();
+ fromconn.done();
+ return false;
+ }
+
+ fromconn.done();
+ return true;
+ }
+
+ bool Chunk::splitIfShould( long dataWritten ){
+ _dataWritten += dataWritten;
+
+ if ( _dataWritten < MaxChunkSize / 5 )
+ return false;
+
+ _dataWritten = 0;
+
+ if ( _min.woCompare( _max ) == 0 ){
+ log() << "SHARD PROBLEM** shard is too big, but can't split: " << toString() << endl;
+ return false;
+ }
+
+ long size = getPhysicalSize();
+ if ( size < MaxChunkSize )
+ return false;
+
+ log() << "autosplitting " << _ns << " size: " << size << " shard: " << toString() << endl;
+ Chunk * newShard = split();
+
+ moveIfShould( newShard );
+
+ return true;
+ }
+
+ bool Chunk::moveIfShould( Chunk * newChunk ){
+ Chunk * toMove = 0;
+
+ if ( newChunk->countObjects() <= 1 ){
+ toMove = newChunk;
+ }
+ else if ( this->countObjects() <= 1 ){
+ toMove = this;
+ }
+ else {
+ log(1) << "don't know how to decide if i should move inner shard" << endl;
+ }
+
+ if ( ! toMove )
+ return false;
+
+ string newLocation = grid.pickShardForNewDB();
+ if ( newLocation == getShard() ){
+ // if this is the best server, then we shouldn't do anything!
+ log(1) << "not moving chunk: " << toString() << " b/c would move to same place " << newLocation << " -> " << getShard() << endl;
+ return 0;
+ }
+
+ log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation << " #objcets: " << toMove->countObjects() << endl;
+
+ string errmsg;
+ massert( 10412 , (string)"moveAndCommit failed: " + errmsg ,
+ toMove->moveAndCommit( newLocation , errmsg ) );
+
+ return true;
+ }
+
+ long Chunk::getPhysicalSize(){
+ ScopedDbConnection conn( getShard() );
+
+ BSONObj result;
+ uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" , BSON( "datasize" << _ns
+ << "keyPattern" << _manager->getShardKey().key()
+ << "min" << getMin()
+ << "max" << getMax()
+ ) , result ) );
+
+ conn.done();
+ return (long)result["size"].number();
+ }
+
+
+ long Chunk::countObjects( const BSONObj& filter ){
+ ScopedDbConnection conn( getShard() );
+
+ BSONObj f = getFilter();
+ if ( ! filter.isEmpty() )
+ f = ClusteredCursor::concatQuery( f , filter );
+
+ BSONObj result;
+ unsigned long long n = conn->count( _ns , f );
+
+ conn.done();
+ return (long)n;
+ }
+
+ bool Chunk::operator==( const Chunk& s ){
+ return
+ _manager->getShardKey().compare( _min , s._min ) == 0 &&
+ _manager->getShardKey().compare( _max , s._max ) == 0
+ ;
+ }
+
+ void Chunk::getFilter( BSONObjBuilder& b ){
+ _manager->_key.getFilter( b , _min , _max );
+ }
+
+ void Chunk::serialize(BSONObjBuilder& to){
+ if ( _lastmod )
+ to.appendTimestamp( "lastmod" , _lastmod );
+ else
+ to.appendTimestamp( "lastmod" );
+
+ to << "ns" << _ns;
+ to << "min" << _min;
+ to << "max" << _max;
+ to << "shard" << _shard;
+ }
+
+ void Chunk::unserialize(const BSONObj& from){
+ _ns = from.getStringField( "ns" );
+ _shard = from.getStringField( "shard" );
+ _lastmod = from.hasField( "lastmod" ) ? from["lastmod"]._numberLong() : 0;
+
+ BSONElement e = from["minDotted"];
+ cout << from << endl;
+ if (e.eoo()){
+ _min = from.getObjectField( "min" ).getOwned();
+ _max = from.getObjectField( "max" ).getOwned();
+ } else { // TODO delete this case after giving people a chance to migrate
+ _min = e.embeddedObject().getOwned();
+ _max = from.getObjectField( "maxDotted" ).getOwned();
+ }
+
+ uassert( 10170 , "Chunk needs a ns" , ! _ns.empty() );
+ uassert( 10171 , "Chunk needs a server" , ! _ns.empty() );
+
+ uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() );
+ uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() );
+ }
+
+ string Chunk::modelServer() {
+ // TODO: this could move around?
+ return configServer.modelServer();
+ }
+
+ void Chunk::_markModified(){
+ _modified = true;
+ // set to 0 so that the config server sets it
+ _lastmod = 0;
+ }
+
+ void Chunk::save( bool check ){
+ bool reload = ! _lastmod;
+ Model::save( check );
+ if ( reload ){
+ // need to do this so that we get the new _lastMod and therefore version number
+ massert( 10413 , "_id has to be filled in already" , ! _id.isEmpty() );
+
+ string b = toString();
+ BSONObj q = _id.copy();
+ massert( 10414 , "how could load fail?" , load( q ) );
+ log(2) << "before: " << q << "\t" << b << endl;
+ log(2) << "after : " << _id << "\t" << toString() << endl;
+ massert( 10415 , "chunk reload changed content!" , b == toString() );
+ massert( 10416 , "id changed!" , q["_id"] == _id["_id"] );
+ }
+ }
+
+ void Chunk::ensureIndex(){
+ ScopedDbConnection conn( getShard() );
+ conn->ensureIndex( _ns , _manager->getShardKey().key() , _manager->_unique );
+ conn.done();
+ }
+
+ string Chunk::toString() const {
+ stringstream ss;
+ ss << "shard ns:" << _ns << " shard: " << _shard << " min: " << _min << " max: " << _max;
+ return ss.str();
+ }
+
+
+ ShardKeyPattern Chunk::skey(){
+ return _manager->getShardKey();
+ }
+
+ // ------- ChunkManager --------
+
+ unsigned long long ChunkManager::NextSequenceNumber = 1;
+
+ ChunkManager::ChunkManager( DBConfig * config , string ns , ShardKeyPattern pattern , bool unique ) :
+ _config( config ) , _ns( ns ) , _key( pattern ) , _unique( unique ){
+ Chunk temp(0);
+
+ ScopedDbConnection conn( temp.modelServer() );
+ auto_ptr<DBClientCursor> cursor = conn->query( temp.getNS() , BSON( "ns" << ns ) );
+ while ( cursor->more() ){
+ BSONObj d = cursor->next();
+ if ( d["isMaxMarker"].trueValue() ){
+ continue;
+ }
+
+ Chunk * c = new Chunk( this );
+ c->unserialize( d );
+ _chunks.push_back( c );
+ c->_id = d["_id"].wrap().getOwned();
+ }
+ conn.done();
+
+ if ( _chunks.size() == 0 ){
+ Chunk * c = new Chunk( this );
+ c->_ns = ns;
+ c->setMin(_key.globalMin());
+ c->setMax(_key.globalMax());
+ c->_shard = config->getPrimary();
+ c->_markModified();
+
+ _chunks.push_back( c );
+
+ log() << "no chunks for:" << ns << " so creating first: " << c->toString() << endl;
+ }
+
+ _sequenceNumber = ++NextSequenceNumber;
+ }
+
+ ChunkManager::~ChunkManager(){
+ for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
+ delete( *i );
+ }
+ _chunks.clear();
+ }
+
+ bool ChunkManager::hasShardKey( const BSONObj& obj ){
+ return _key.hasShardKey( obj );
+ }
+
+ Chunk& ChunkManager::findChunk( const BSONObj & obj ){
+
+ for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
+ Chunk * c = *i;
+ if ( c->contains( obj ) )
+ return *c;
+ }
+ stringstream ss;
+ ss << "couldn't find a chunk which should be impossible extracted: " << _key.extractKey( obj );
+ throw UserException( 8070 , ss.str() );
+ }
+
+ Chunk* ChunkManager::findChunkOnServer( const string& server ) const {
+
+ for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
+ Chunk * c = *i;
+ if ( c->getShard() == server )
+ return c;
+ }
+
+ return 0;
+ }
+
+ int ChunkManager::getChunksForQuery( vector<Chunk*>& chunks , const BSONObj& query ){
+ int added = 0;
+
+ for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
+ Chunk * c = *i;
+ if ( _key.relevantForQuery( query , c ) ){
+ chunks.push_back( c );
+ added++;
+ }
+ }
+ return added;
+ }
+
+ void ChunkManager::getAllServers( set<string>& allServers ){
+ for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
+ allServers.insert( (*i)->getShard() );
+ }
+ }
+
+ void ChunkManager::ensureIndex(){
+ set<string> seen;
+
+ for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
+ Chunk * c = *i;
+ if ( seen.count( c->getShard() ) )
+ continue;
+ seen.insert( c->getShard() );
+ c->ensureIndex();
+ }
+ }
+
+ void ChunkManager::drop(){
+ uassert( 10174 , "config servers not all up" , configServer.allUp() );
+
+ map<string,ShardChunkVersion> seen;
+
+ log(1) << "ChunkManager::drop : " << _ns << endl;
+
+ // lock all shards so no one can do a split/migrate
+ for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
+ Chunk * c = *i;
+ ShardChunkVersion& version = seen[ c->getShard() ];
+ if ( version )
+ continue;
+ version = lockNamespaceOnServer( c->getShard() , _ns );
+ if ( version )
+ continue;
+
+ // rollback
+ uassert( 10175 , "don't know how to rollback locks b/c drop can't lock all shards" , 0 );
+ }
+
+ log(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl;
+
+ // wipe my meta-data
+ _chunks.clear();
+
+
+ // delete data from mongod
+ for ( map<string,ShardChunkVersion>::iterator i=seen.begin(); i!=seen.end(); i++ ){
+ string shard = i->first;
+ ScopedDbConnection conn( shard );
+ conn->dropCollection( _ns );
+ conn.done();
+ }
+
+ log(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl;
+
+ // clean up database meta-data
+ uassert( 10176 , "no sharding data?" , _config->removeSharding( _ns ) );
+ _config->save();
+
+
+ // remove chunk data
+ Chunk temp(0);
+ ScopedDbConnection conn( temp.modelServer() );
+ conn->remove( temp.getNS() , BSON( "ns" << _ns ) );
+ conn.done();
+ log(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl;
+
+ for ( map<string,ShardChunkVersion>::iterator i=seen.begin(); i!=seen.end(); i++ ){
+ ScopedDbConnection conn( i->first );
+ BSONObj res;
+ if ( ! setShardVersion( conn.conn() , _ns , 0 , true , res ) )
+ throw UserException( 8071 , (string)"OH KNOW, cleaning up after drop failed: " + res.toString() );
+ conn.done();
+ }
+
+
+ log(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl;
+ }
+
+ void ChunkManager::save(){
+ ShardChunkVersion a = getVersion();
+
+ set<string> withRealChunks;
+
+ for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
+ Chunk* c = *i;
+ if ( ! c->_modified )
+ continue;
+ c->save( true );
+ _sequenceNumber = ++NextSequenceNumber;
+
+ withRealChunks.insert( c->getShard() );
+ }
+
+ massert( 10417 , "how did version get smalled" , getVersion() >= a );
+
+ ensureIndex(); // TODO: this is too aggressive - but not really sooo bad
+ }
+
+ ShardChunkVersion ChunkManager::getVersion( const string& server ) const{
+ // TODO: cache or something?
+
+ ShardChunkVersion max = 0;
+
+ for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
+ Chunk* c = *i;
+ if ( c->getShard() != server )
+ continue;
+
+ if ( c->_lastmod > max )
+ max = c->_lastmod;
+ }
+
+ return max;
+ }
+
+ ShardChunkVersion ChunkManager::getVersion() const{
+ ShardChunkVersion max = 0;
+
+ for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
+ Chunk* c = *i;
+ if ( c->_lastmod > max )
+ max = c->_lastmod;
+ }
+
+ return max;
+ }
+
+ string ChunkManager::toString() const {
+ stringstream ss;
+ ss << "ChunkManager: " << _ns << " key:" << _key.toString() << "\n";
+ for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
+ const Chunk* c = *i;
+ ss << "\t" << c->toString() << "\n";
+ }
+ return ss.str();
+ }
+
+
+ class ChunkObjUnitTest : public UnitTest {
+ public:
+ void runShard(){
+
+ }
+
+ void run(){
+ runShard();
+ log(1) << "shardObjTest passed" << endl;
+ }
+ } shardObjTest;
+
+
+} // namespace mongo
diff --git a/s/chunk.h b/s/chunk.h
new file mode 100644
index 0000000..7395133
--- /dev/null
+++ b/s/chunk.h
@@ -0,0 +1,221 @@
+// shard.h
+
+/*
+ A "shard" is a database (replica pair typically) which represents
+ one partition of the overall database.
+*/
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../stdafx.h"
+#include "../client/dbclient.h"
+#include "../client/model.h"
+#include "shardkey.h"
+#include <boost/utility.hpp>
+#undef assert
+#define assert xassert
+
+namespace mongo {
+
+ class DBConfig;
+ class ChunkManager;
+ class ChunkObjUnitTest;
+
+ typedef unsigned long long ShardChunkVersion;
+
+ /**
+ config.chunks
+ { ns : "alleyinsider.fs.chunks" , min : {} , max : {} , server : "localhost:30001" }
+
+ x is in a shard iff
+ min <= x < max
+ */
+ class Chunk : public Model , boost::noncopyable {
+ public:
+
+ Chunk( ChunkManager * info );
+
+ const BSONObj& getMin() const { return _min; }
+ const BSONObj& getMax() const { return _max; }
+
+ void setMin(const BSONObj& o){
+ _min = o;
+ }
+ void setMax(const BSONObj& o){
+ _max = o;
+ }
+
+ string getShard(){
+ return _shard;
+ }
+ void setShard( string shard );
+
+ bool contains( const BSONObj& obj );
+
+ string toString() const;
+ operator string() const { return toString(); }
+
+ bool operator==(const Chunk& s);
+
+ bool operator!=(const Chunk& s){
+ return ! ( *this == s );
+ }
+
+ void getFilter( BSONObjBuilder& b );
+ BSONObj getFilter(){ BSONObjBuilder b; getFilter( b ); return b.obj(); }
+
+
+ BSONObj pickSplitPoint();
+ Chunk * split();
+ Chunk * split( const BSONObj& middle );
+
+ /**
+ * @return size of shard in bytes
+ * talks to mongod to do this
+ */
+ long getPhysicalSize();
+
+ long countObjects( const BSONObj& filter = BSONObj() );
+
+ /**
+ * if the amount of data written nears the max size of a shard
+ * then we check the real size, and if its too big, we split
+ */
+ bool splitIfShould( long dataWritten );
+
+
+ /*
+ * moves either this shard or newShard if it makes sense too
+ * @return whether or not a shard was moved
+ */
+ bool moveIfShould( Chunk * newShard = 0 );
+
+ bool moveAndCommit( const string& to , string& errmsg );
+
+ virtual const char * getNS(){ return "config.chunks"; }
+ virtual void serialize(BSONObjBuilder& to);
+ virtual void unserialize(const BSONObj& from);
+ virtual string modelServer();
+
+ virtual void save( bool check=false );
+
+ void ensureIndex();
+
+ void _markModified();
+
+ static long MaxChunkSize;
+
+ private:
+
+ // main shard info
+
+ ChunkManager * _manager;
+ ShardKeyPattern skey();
+
+ string _ns;
+ BSONObj _min;
+ BSONObj _max;
+ string _shard;
+ ShardChunkVersion _lastmod;
+
+ bool _modified;
+
+ // transient stuff
+
+ long _dataWritten;
+
+ // methods, etc..
+
+ void _split( BSONObj& middle );
+
+ friend class ChunkManager;
+ friend class ShardObjUnitTest;
+ };
+
+ /* config.sharding
+ { ns: 'alleyinsider.fs.chunks' ,
+ key: { ts : 1 } ,
+ shards: [ { min: 1, max: 100, server: a } , { min: 101, max: 200 , server : b } ]
+ }
+ */
+ class ChunkManager {
+ public:
+
+ ChunkManager( DBConfig * config , string ns , ShardKeyPattern pattern , bool unique );
+ virtual ~ChunkManager();
+
+ string getns(){
+ return _ns;
+ }
+
+ int numChunks(){ return _chunks.size(); }
+ Chunk* getChunk( int i ){ return _chunks[i]; }
+ bool hasShardKey( const BSONObj& obj );
+
+ Chunk& findChunk( const BSONObj& obj );
+ Chunk* findChunkOnServer( const string& server ) const;
+
+ ShardKeyPattern& getShardKey(){ return _key; }
+ bool isUnique(){ return _unique; }
+
+ /**
+ * makes sure the shard index is on all servers
+ */
+ void ensureIndex();
+
+ /**
+ * @return number of Chunk added to the vector
+ */
+ int getChunksForQuery( vector<Chunk*>& chunks , const BSONObj& query );
+
+ void getAllServers( set<string>& allServers );
+
+ void save();
+
+ string toString() const;
+ operator string() const { return toString(); }
+
+ ShardChunkVersion getVersion( const string& server ) const;
+ ShardChunkVersion getVersion() const;
+
+ /**
+ * this is just an increasing number of how many ChunkManagers we have so we know if something has been updated
+ */
+ unsigned long long getSequenceNumber(){
+ return _sequenceNumber;
+ }
+
+ void drop();
+
+ private:
+ DBConfig * _config;
+ string _ns;
+ ShardKeyPattern _key;
+ bool _unique;
+
+ vector<Chunk*> _chunks;
+ map<string,unsigned long long> _maxMarkers;
+
+ unsigned long long _sequenceNumber;
+
+ friend class Chunk;
+ static unsigned long long NextSequenceNumber;
+ };
+
+} // namespace mongo
diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp
new file mode 100644
index 0000000..e79b529
--- /dev/null
+++ b/s/commands_admin.cpp
@@ -0,0 +1,696 @@
+// s/commands_admin.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+/* TODO
+ _ concurrency control.
+ _ limit() works right?
+ _ KillCursors
+
+ later
+ _ secondary indexes
+*/
+
+#include "stdafx.h"
+#include "../util/message.h"
+#include "../db/dbmessage.h"
+#include "../client/connpool.h"
+#include "../db/commands.h"
+
+#include "config.h"
+#include "chunk.h"
+#include "strategy.h"
+
+namespace mongo {
+
+ extern string ourHostname;
+
+ namespace dbgrid_cmds {
+
+ set<string> dbgridCommands;
+
+ class GridAdminCmd : public Command {
+ public:
+ GridAdminCmd( const char * n ) : Command( n ){
+ dbgridCommands.insert( n );
+ }
+ virtual bool slaveOk(){
+ return true;
+ }
+ virtual bool adminOnly() {
+ return true;
+ }
+ };
+
+ // --------------- misc commands ----------------------
+
+ class NetStatCmd : public GridAdminCmd {
+ public:
+ NetStatCmd() : GridAdminCmd("netstat") { }
+ virtual void help( stringstream& help ) const {
+ help << " shows status/reachability of servers in the cluster";
+ }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ result.append("configserver", configServer.getPrimary() );
+ result.append("isdbgrid", 1);
+ return true;
+ }
+ } netstat;
+
+ class ListGridCommands : public GridAdminCmd {
+ public:
+ ListGridCommands() : GridAdminCmd("gridcommands") { }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ BSONObjBuilder arr;
+ int num=0;
+ for ( set<string>::iterator i = dbgridCommands.begin(); i != dbgridCommands.end(); i++ ){
+ string s = BSONObjBuilder::numStr( num++ );
+ arr.append( s.c_str() , *i );
+ }
+
+ result.appendArray( "commands" , arr.done() );
+ return true;
+ }
+ } listGridCommands;
+
+ // ------------ database level commands -------------
+
+ class ListDatabaseCommand : public GridAdminCmd {
+ public:
+ ListDatabaseCommand() : GridAdminCmd("listdatabases") { }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ ScopedDbConnection conn( configServer.getPrimary() );
+
+ auto_ptr<DBClientCursor> cursor = conn->query( "config.databases" , BSONObj() );
+
+ BSONObjBuilder list;
+ int num = 0;
+ while ( cursor->more() ){
+ string s = BSONObjBuilder::numStr( num++ );
+
+ BSONObj o = cursor->next();
+ list.append( s.c_str() , o["name"].valuestrsafe() );
+ }
+
+ result.appendArray("databases" , list.obj() );
+ conn.done();
+
+ return true;
+ }
+ } gridListDatabase;
+
+ class MoveDatabasePrimaryCommand : public GridAdminCmd {
+ public:
+ MoveDatabasePrimaryCommand() : GridAdminCmd("moveprimary") { }
+ virtual void help( stringstream& help ) const {
+ help << " example: { moveprimary : 'foo' , to : 'localhost:9999' } TODO: locking? ";
+ }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ string dbname = cmdObj["moveprimary"].valuestrsafe();
+
+ if ( dbname.size() == 0 ){
+ errmsg = "no db";
+ return false;
+ }
+
+ if ( dbname == "config" ){
+ errmsg = "can't move config db";
+ return false;
+ }
+
+ DBConfig * config = grid.getDBConfig( dbname , false );
+ if ( ! config ){
+ errmsg = "can't find db!";
+ return false;
+ }
+
+ string to = cmdObj["to"].valuestrsafe();
+ if ( ! to.size() ){
+ errmsg = "you have to specify where you want to move it";
+ return false;
+ }
+
+ if ( to == config->getPrimary() ){
+ errmsg = "thats already the primary";
+ return false;
+ }
+
+ if ( ! grid.knowAboutShard( to ) ){
+ errmsg = "that server isn't known to me";
+ return false;
+ }
+
+ ScopedDbConnection conn( configServer.getPrimary() );
+
+ log() << "moving " << dbname << " primary from: " << config->getPrimary() << " to: " << to << endl;
+
+ // TODO LOCKING: this is not safe with multiple mongos
+
+
+ ScopedDbConnection toconn( to );
+
+ // TODO AARON - we need a clone command which replays operations from clone start to now
+ // using a seperate smaller oplog
+ BSONObj cloneRes;
+ bool worked = toconn->runCommand( dbname.c_str() , BSON( "clone" << config->getPrimary() ) , cloneRes );
+ toconn.done();
+ if ( ! worked ){
+ log() << "clone failed" << cloneRes << endl;
+ errmsg = "clone failed";
+ conn.done();
+ return false;
+ }
+
+ ScopedDbConnection fromconn( config->getPrimary() );
+
+ config->setPrimary( to );
+ config->save( true );
+
+ log() << " dropping " << dbname << " from old" << endl;
+
+ fromconn->dropDatabase( dbname.c_str() );
+ fromconn.done();
+
+ result << "primary" << to;
+
+ conn.done();
+ return true;
+ }
+ } movePrimary;
+
+ class EnableShardingCmd : public GridAdminCmd {
+ public:
+ EnableShardingCmd() : GridAdminCmd( "enablesharding" ){}
+ virtual void help( stringstream& help ) const {
+ help
+ << "Enable sharding for a db. (Use 'shardcollection' command afterwards.)\n"
+ << " { enablesharding : \"<dbname>\" }\n";
+ }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ string dbname = cmdObj["enablesharding"].valuestrsafe();
+ if ( dbname.size() == 0 ){
+ errmsg = "no db";
+ return false;
+ }
+
+ DBConfig * config = grid.getDBConfig( dbname );
+ if ( config->isShardingEnabled() ){
+ errmsg = "already enabled";
+ return false;
+ }
+
+ log() << "enabling sharding on: " << dbname << endl;
+
+ config->enableSharding();
+ config->save( true );
+
+ return true;
+ }
+ } enableShardingCmd;
+
+ // ------------ collection level commands -------------
+
+ class ShardCollectionCmd : public GridAdminCmd {
+ public:
+ ShardCollectionCmd() : GridAdminCmd( "shardcollection" ){}
+ virtual void help( stringstream& help ) const {
+ help
+ << "Shard a collection. Requires key. Optional unique. Sharding must already be enabled for the database.\n"
+ << " { enablesharding : \"<dbname>\" }\n";
+ }
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ string ns = cmdObj["shardcollection"].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "no ns";
+ return false;
+ }
+
+ DBConfig * config = grid.getDBConfig( ns );
+ if ( ! config->isShardingEnabled() ){
+ errmsg = "sharding not enabled for db";
+ return false;
+ }
+
+ if ( config->isSharded( ns ) ){
+ errmsg = "already sharded";
+ return false;
+ }
+
+ BSONObj key = cmdObj.getObjectField( "key" );
+ if ( key.isEmpty() ){
+ errmsg = "no shard key";
+ return false;
+ } else if (key.nFields() > 1){
+ errmsg = "compound shard keys not supported yet";
+ return false;
+ }
+
+ if ( ns.find( ".system." ) != string::npos ){
+ errmsg = "can't shard system namespaces";
+ return false;
+ }
+
+ {
+ ScopedDbConnection conn( config->getPrimary() );
+ BSONObjBuilder b;
+ b.append( "ns" , ns );
+ b.appendBool( "unique" , true );
+ if ( conn->count( config->getName() + ".system.indexes" , b.obj() ) ){
+ errmsg = "can't shard collection with unique indexes";
+ conn.done();
+ return false;
+ }
+
+ BSONObj res = conn->findOne( config->getName() + ".system.namespaces" , BSON( "name" << ns ) );
+ if ( res["options"].type() == Object && res["options"].embeddedObject()["capped"].trueValue() ){
+ errmsg = "can't shard capped collection";
+ conn.done();
+ return false;
+ }
+
+ conn.done();
+ }
+
+ log() << "CMD: shardcollection: " << cmdObj << endl;
+
+ config->shardCollection( ns , key , cmdObj["unique"].trueValue() );
+ config->save( true );
+
+ result << "collectionsharded" << ns;
+ return true;
+ }
+ } shardCollectionCmd;
+
+ class GetShardVersion : public GridAdminCmd {
+ public:
+ GetShardVersion() : GridAdminCmd( "getShardVersion" ){}
+ virtual void help( stringstream& help ) const {
+ help << " example: { getShardVersion : 'alleyinsider.foo' } ";
+ }
+
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ string ns = cmdObj["getShardVersion"].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "need to speciy fully namespace";
+ return false;
+ }
+
+ DBConfig * config = grid.getDBConfig( ns );
+ if ( ! config->isSharded( ns ) ){
+ errmsg = "ns not sharded.";
+ return false;
+ }
+
+ ChunkManager * cm = config->getChunkManager( ns );
+ if ( ! cm ){
+ errmsg = "no chunk manager?";
+ return false;
+ }
+
+ result.appendTimestamp( "version" , cm->getVersion() );
+
+ return 1;
+ }
+ } getShardVersionCmd;
+
+ class SplitCollectionHelper : public GridAdminCmd {
+ public:
+ SplitCollectionHelper( const char * name ) : GridAdminCmd( name ) , _name( name ){}
+ virtual void help( stringstream& help ) const {
+ help
+ << " example: { shard : 'alleyinsider.blog.posts' , find : { ts : 1 } } - split the shard that contains give key \n"
+ << " example: { shard : 'alleyinsider.blog.posts' , middle : { ts : 1 } } - split the shard that contains the key with this as the middle \n"
+ << " NOTE: this does not move move the chunks, it merely creates a logical seperation \n"
+ ;
+ }
+
+ virtual bool _split( BSONObjBuilder& result , string&errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ) = 0;
+
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ string ns = cmdObj[_name.c_str()].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "no ns";
+ return false;
+ }
+
+ DBConfig * config = grid.getDBConfig( ns );
+ if ( ! config->isSharded( ns ) ){
+ errmsg = "ns not sharded. have to shard before can split";
+ return false;
+ }
+
+ BSONObj find = cmdObj.getObjectField( "find" );
+ if ( find.isEmpty() ){
+ find = cmdObj.getObjectField( "middle" );
+
+ if ( find.isEmpty() ){
+ errmsg = "need to specify find or middle";
+ return false;
+ }
+ }
+
+ ChunkManager * info = config->getChunkManager( ns );
+ Chunk& old = info->findChunk( find );
+
+ return _split( result , errmsg , ns , info , old , cmdObj.getObjectField( "middle" ) );
+ }
+
+ protected:
+ string _name;
+ };
+
+ class SplitValueCommand : public SplitCollectionHelper {
+ public:
+ SplitValueCommand() : SplitCollectionHelper( "splitvalue" ){}
+ virtual bool _split( BSONObjBuilder& result , string& errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ){
+
+ result << "shardinfo" << old.toString();
+
+ result.appendBool( "auto" , middle.isEmpty() );
+
+ if ( middle.isEmpty() )
+ middle = old.pickSplitPoint();
+
+ result.append( "middle" , middle );
+
+ return true;
+ }
+
+ } splitValueCmd;
+
+
+ class SplitCollection : public SplitCollectionHelper {
+ public:
+ SplitCollection() : SplitCollectionHelper( "split" ){}
+ virtual bool _split( BSONObjBuilder& result , string& errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ){
+
+ log() << "splitting: " << ns << " shard: " << old << endl;
+
+ if ( middle.isEmpty() )
+ old.split();
+ else
+ old.split( middle );
+
+ return true;
+ }
+
+
+ } splitCollectionCmd;
+
+ class MoveChunkCmd : public GridAdminCmd {
+ public:
+ MoveChunkCmd() : GridAdminCmd( "movechunk" ){}
+ virtual void help( stringstream& help ) const {
+ help << "{ movechunk : 'test.foo' , find : { num : 1 } , to : 'localhost:30001' }";
+ }
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ string ns = cmdObj["movechunk"].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "no ns";
+ return false;
+ }
+
+ DBConfig * config = grid.getDBConfig( ns );
+ if ( ! config->isSharded( ns ) ){
+ errmsg = "ns not sharded. have to shard before can move a chunk";
+ return false;
+ }
+
+ BSONObj find = cmdObj.getObjectField( "find" );
+ if ( find.isEmpty() ){
+ errmsg = "need to specify find. see help";
+ return false;
+ }
+
+ string to = cmdObj["to"].valuestrsafe();
+ if ( ! to.size() ){
+ errmsg = "you have to specify where you want to move the chunk";
+ return false;
+ }
+
+ log() << "CMD: movechunk: " << cmdObj << endl;
+
+ ChunkManager * info = config->getChunkManager( ns );
+ Chunk& c = info->findChunk( find );
+ string from = c.getShard();
+
+ if ( from == to ){
+ errmsg = "that chunk is already on that shard";
+ return false;
+ }
+
+ if ( ! grid.knowAboutShard( to ) ){
+ errmsg = "that shard isn't known to me";
+ return false;
+ }
+
+ if ( ! c.moveAndCommit( to , errmsg ) )
+ return false;
+
+ return true;
+ }
+ } moveChunkCmd;
+
+ // ------------ server level commands -------------
+
+ class ListShardsCmd : public GridAdminCmd {
+ public:
+ ListShardsCmd() : GridAdminCmd("listshards") { }
+ virtual void help( stringstream& help ) const {
+ help << "list all shards of the system";
+ }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ ScopedDbConnection conn( configServer.getPrimary() );
+
+ vector<BSONObj> all;
+ auto_ptr<DBClientCursor> cursor = conn->query( "config.shards" , BSONObj() );
+ while ( cursor->more() ){
+ BSONObj o = cursor->next();
+ all.push_back( o );
+ }
+
+ result.append("shards" , all );
+ conn.done();
+
+ return true;
+ }
+ } listShardsCmd;
+
+ /* a shard is a single mongod server or a replica pair. add it (them) to the cluster as a storage partition. */
+ class AddShard : public GridAdminCmd {
+ public:
+ AddShard() : GridAdminCmd("addshard") { }
+ virtual void help( stringstream& help ) const {
+ help << "add a new shard to the system";
+ }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ ScopedDbConnection conn( configServer.getPrimary() );
+
+
+ string host = cmdObj["addshard"].valuestrsafe();
+
+ if ( host == "localhost" || host.find( "localhost:" ) == 0 ||
+ host == "127.0.0.1" || host.find( "127.0.0.1:" ) == 0 ){
+ if ( cmdObj["allowLocal"].type() != Bool ||
+ ! cmdObj["allowLocal"].boolean() ){
+ errmsg =
+ "can't use localhost as a shard since all shards need to communicate. "
+ "allowLocal to override for testing";
+ return false;
+ }
+ }
+
+ if ( host.find( ":" ) == string::npos ){
+ stringstream ss;
+ ss << host << ":" << CmdLine::ShardServerPort;
+ host = ss.str();
+ }
+
+ BSONObj shard;
+ {
+ BSONObjBuilder b;
+ b.append( "host" , host );
+ if ( cmdObj["maxSize"].isNumber() )
+ b.append( cmdObj["maxSize"] );
+ shard = b.obj();
+ }
+
+ BSONObj old = conn->findOne( "config.shards" , shard );
+ if ( ! old.isEmpty() ){
+ result.append( "msg" , "already exists" );
+ conn.done();
+ return false;
+ }
+
+ try {
+ ScopedDbConnection newShardConn( host );
+ newShardConn->getLastError();
+ newShardConn.done();
+ }
+ catch ( DBException& e ){
+ errmsg = "couldn't connect to new shard";
+ result.append( "host" , host );
+ result.append( "exception" , e.what() );
+ conn.done();
+ return false;
+ }
+
+
+
+ conn->insert( "config.shards" , shard );
+ result.append( "added" , shard["host"].valuestrsafe() );
+ conn.done();
+ return true;
+ }
+ } addServer;
+
+ class RemoveShardCmd : public GridAdminCmd {
+ public:
+ RemoveShardCmd() : GridAdminCmd("removeshard") { }
+ virtual void help( stringstream& help ) const {
+ help << "remove a shard to the system.\nshard must be empty or command will return an error.";
+ }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ if ( 1 ){
+ errmsg = "removeshard not yet implemented";
+ return 0;
+ }
+
+ ScopedDbConnection conn( configServer.getPrimary() );
+
+ BSONObj server = BSON( "host" << cmdObj["removeshard"].valuestrsafe() );
+ conn->remove( "config.shards" , server );
+
+ conn.done();
+ return true;
+ }
+ } removeShardCmd;
+
+
+ // --------------- public commands ----------------
+
+ class IsDbGridCmd : public Command {
+ public:
+ virtual bool slaveOk() {
+ return true;
+ }
+ IsDbGridCmd() : Command("isdbgrid") { }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ result.append("isdbgrid", 1);
+ result.append("hostname", ourHostname);
+ return true;
+ }
+ } isdbgrid;
+
+ class CmdIsMaster : public Command {
+ public:
+ virtual bool requiresAuth() { return false; }
+ virtual bool slaveOk() {
+ return true;
+ }
+ virtual void help( stringstream& help ) const {
+ help << "test if this is master half of a replica pair";
+ }
+ CmdIsMaster() : Command("ismaster") { }
+ virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ result.append("ismaster", 1.0 );
+ result.append("msg", "isdbgrid");
+ return true;
+ }
+ } ismaster;
+
+ class CmdShardingGetPrevError : public Command {
+ public:
+ virtual bool requiresAuth() { return false; }
+ virtual bool slaveOk() {
+ return true;
+ }
+ virtual void help( stringstream& help ) const {
+ help << "get previous error (since last reseterror command)";
+ }
+ CmdShardingGetPrevError() : Command("getpreverror") { }
+ virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ errmsg += "getpreverror not supported for sharded environments";
+ return false;
+ }
+ } cmdGetPrevError;
+
+ class CmdShardingGetLastError : public Command {
+ public:
+ virtual bool requiresAuth() { return false; }
+ virtual bool slaveOk() {
+ return true;
+ }
+ virtual void help( stringstream& help ) const {
+ help << "check for an error on the last command executed";
+ }
+ CmdShardingGetLastError() : Command("getlasterror") { }
+ virtual bool run(const char *nsraw, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ string dbName = nsraw;
+ dbName = dbName.substr( 0 , dbName.size() - 5 );
+
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ ClientInfo * client = ClientInfo::get();
+ set<string> * shards = client->getPrev();
+
+ if ( shards->size() == 0 ){
+ result.appendNull( "err" );
+ return true;
+ }
+
+ if ( shards->size() == 1 ){
+ string theShard = *(shards->begin() );
+ result.append( "theshard" , theShard.c_str() );
+ ScopedDbConnection conn( theShard );
+ BSONObj res;
+ bool ok = conn->runCommand( conf->getName() , cmdObj , res );
+ result.appendElements( res );
+ conn.done();
+ return ok;
+ }
+
+ vector<string> errors;
+ for ( set<string>::iterator i = shards->begin(); i != shards->end(); i++ ){
+ string theShard = *i;
+ ScopedDbConnection conn( theShard );
+ string temp = conn->getLastError();
+ if ( temp.size() )
+ errors.push_back( temp );
+ conn.done();
+ }
+
+ if ( errors.size() == 0 ){
+ result.appendNull( "err" );
+ return true;
+ }
+
+ result.append( "err" , errors[0].c_str() );
+
+ BSONObjBuilder all;
+ for ( unsigned i=0; i<errors.size(); i++ ){
+ all.append( all.numStr( i ).c_str() , errors[i].c_str() );
+ }
+ result.appendArray( "errs" , all.obj() );
+ return true;
+ }
+ } cmdGetLastError;
+
+ }
+
+} // namespace mongo
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
new file mode 100644
index 0000000..2d3de7a
--- /dev/null
+++ b/s/commands_public.cpp
@@ -0,0 +1,368 @@
+// s/commands_public.cpp
+
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "stdafx.h"
+#include "../util/message.h"
+#include "../db/dbmessage.h"
+#include "../client/connpool.h"
+#include "../client/parallel.h"
+#include "../db/commands.h"
+
+#include "config.h"
+#include "chunk.h"
+#include "strategy.h"
+
+namespace mongo {
+
+ namespace dbgrid_pub_cmds {
+
+ class PublicGridCommand : public Command {
+ public:
+ PublicGridCommand( const char * n ) : Command( n ){
+ }
+ virtual bool slaveOk(){
+ return true;
+ }
+ virtual bool adminOnly() {
+ return false;
+ }
+ protected:
+ string getDBName( string ns ){
+ return ns.substr( 0 , ns.size() - 5 );
+ }
+
+ bool passthrough( DBConfig * conf, const BSONObj& cmdObj , BSONObjBuilder& result ){
+ ScopedDbConnection conn( conf->getPrimary() );
+ BSONObj res;
+ bool ok = conn->runCommand( conf->getName() , cmdObj , res );
+ result.appendElements( res );
+ conn.done();
+ return ok;
+ }
+ };
+
+ class NotAllowedOnShardedCollectionCmd : public PublicGridCommand {
+ public:
+ NotAllowedOnShardedCollectionCmd( const char * n ) : PublicGridCommand( n ){}
+
+ virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) = 0;
+
+ virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ string dbName = getDBName( ns );
+ string fullns = getFullNS( dbName , cmdObj );
+
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+ return passthrough( conf , cmdObj , result );
+ }
+ errmsg = "can't do command: " + name + " on sharded collection";
+ return false;
+ }
+ };
+
+ // ----
+
+ class DropCmd : public PublicGridCommand {
+ public:
+ DropCmd() : PublicGridCommand( "drop" ){}
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ string dbName = getDBName( ns );
+ string collection = cmdObj.firstElement().valuestrsafe();
+ string fullns = dbName + "." + collection;
+
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ log() << "DROP: " << fullns << endl;
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+ return passthrough( conf , cmdObj , result );
+ }
+
+ ChunkManager * cm = conf->getChunkManager( fullns );
+ massert( 10418 , "how could chunk manager be null!" , cm );
+
+ cm->drop();
+
+ return 1;
+ }
+ } dropCmd;
+
+ class DropDBCmd : public PublicGridCommand {
+ public:
+ DropDBCmd() : PublicGridCommand( "dropDatabase" ){}
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ BSONElement e = cmdObj.firstElement();
+
+ if ( ! e.isNumber() || e.number() != 1 ){
+ errmsg = "invalid params";
+ return 0;
+ }
+
+ string dbName = getDBName( ns );
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ log() << "DROP DATABASE: " << dbName << endl;
+
+ if ( ! conf || ! conf->isShardingEnabled() ){
+ log(1) << " passing though drop database for: " << dbName << endl;
+ return passthrough( conf , cmdObj , result );
+ }
+
+ if ( ! conf->dropDatabase( errmsg ) )
+ return false;
+
+ result.append( "dropped" , dbName );
+ return true;
+ }
+ } dropDBCmd;
+
+ class CountCmd : public PublicGridCommand {
+ public:
+ CountCmd() : PublicGridCommand("count") { }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ string dbName = getDBName( ns );
+ string collection = cmdObj.firstElement().valuestrsafe();
+ string fullns = dbName + "." + collection;
+
+ BSONObj filter = cmdObj["query"].embeddedObject();
+
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+ ScopedDbConnection conn( conf->getPrimary() );
+ result.append( "n" , (double)conn->count( fullns , filter ) );
+ conn.done();
+ return true;
+ }
+
+ ChunkManager * cm = conf->getChunkManager( fullns );
+ massert( 10419 , "how could chunk manager be null!" , cm );
+
+ vector<Chunk*> chunks;
+ cm->getChunksForQuery( chunks , filter );
+
+ unsigned long long total = 0;
+ for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){
+ Chunk * c = *i;
+ total += c->countObjects( filter );
+ }
+
+ result.append( "n" , (double)total );
+ return true;
+ }
+ } countCmd;
+
+ class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd {
+ public:
+ ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped"){}
+
+ virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ){
+ return dbName + "." + cmdObj.firstElement().valuestrsafe();
+ }
+
+ } convertToCappedCmd;
+
+
+ class GroupCmd : public NotAllowedOnShardedCollectionCmd {
+ public:
+ GroupCmd() : NotAllowedOnShardedCollectionCmd("group"){}
+
+ virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ){
+ return dbName + "." + cmdObj.firstElement().embeddedObjectUserCheck()["ns"].valuestrsafe();
+ }
+
+ } groupCmd;
+
+ class DistinctCmd : public PublicGridCommand {
+ public:
+ DistinctCmd() : PublicGridCommand("distinct"){}
+ virtual void help( stringstream &help ) const {
+ help << "{ distinct : 'collection name' , key : 'a.b' }";
+ }
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ string dbName = getDBName( ns );
+ string collection = cmdObj.firstElement().valuestrsafe();
+ string fullns = dbName + "." + collection;
+
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+ return passthrough( conf , cmdObj , result );
+ }
+
+ ChunkManager * cm = conf->getChunkManager( fullns );
+ massert( 10420 , "how could chunk manager be null!" , cm );
+
+ vector<Chunk*> chunks;
+ cm->getChunksForQuery( chunks , BSONObj() );
+
+ set<BSONObj,BSONObjCmp> all;
+ int size = 32;
+
+ for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){
+ Chunk * c = *i;
+
+ ScopedDbConnection conn( c->getShard() );
+ BSONObj res;
+ bool ok = conn->runCommand( conf->getName() , cmdObj , res );
+ conn.done();
+
+ if ( ! ok ){
+ result.appendElements( res );
+ return false;
+ }
+
+ BSONObjIterator it( res["values"].embeddedObjectUserCheck() );
+ while ( it.more() ){
+ BSONElement nxt = it.next();
+ BSONObjBuilder temp(32);
+ temp.appendAs( nxt , "x" );
+ all.insert( temp.obj() );
+ }
+
+ }
+
+ BSONObjBuilder b( size );
+ int n=0;
+ for ( set<BSONObj,BSONObjCmp>::iterator i = all.begin() ; i != all.end(); i++ ){
+ b.appendAs( i->firstElement() , b.numStr( n++ ).c_str() );
+ }
+
+ result.appendArray( "values" , b.obj() );
+ return true;
+ }
+ } disinctCmd;
+
+ class MRCmd : public PublicGridCommand {
+ public:
+ MRCmd() : PublicGridCommand( "mapreduce" ){}
+
+ string getTmpName( const string& coll ){
+ static int inc = 1;
+ stringstream ss;
+ ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++;
+ return ss.str();
+ }
+
+ BSONObj fixForShards( const BSONObj& orig , const string& output ){
+ BSONObjBuilder b;
+ BSONObjIterator i( orig );
+ while ( i.more() ){
+ BSONElement e = i.next();
+ string fn = e.fieldName();
+ if ( fn == "map" ||
+ fn == "mapreduce" ||
+ fn == "reduce" ||
+ fn == "query" ||
+ fn == "sort" ||
+ fn == "verbose" ){
+ b.append( e );
+ }
+ else if ( fn == "keeptemp" ||
+ fn == "out" ||
+ fn == "finalize" ){
+ // we don't want to copy these
+ }
+ else {
+ uassert( 10177 , (string)"don't know mr field: " + fn , 0 );
+ }
+ }
+ b.append( "out" , output );
+ return b.obj();
+ }
+
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ Timer t;
+
+ string dbName = getDBName( ns );
+ string collection = cmdObj.firstElement().valuestrsafe();
+ string fullns = dbName + "." + collection;
+
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+ return passthrough( conf , cmdObj , result );
+ }
+
+ BSONObjBuilder timingBuilder;
+
+ ChunkManager * cm = conf->getChunkManager( fullns );
+
+ BSONObj q;
+ if ( cmdObj["query"].type() == Object ){
+ q = cmdObj["query"].embeddedObjectUserCheck();
+ }
+
+ vector<Chunk*> chunks;
+ cm->getChunksForQuery( chunks , q );
+
+ const string shardedOutputCollection = getTmpName( collection );
+
+ BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection );
+
+ BSONObjBuilder finalCmd;
+ finalCmd.append( "mapreduce.shardedfinish" , cmdObj );
+ finalCmd.append( "shardedOutputCollection" , shardedOutputCollection );
+
+ list< shared_ptr<Future::CommandResult> > futures;
+
+ for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){
+ Chunk * c = *i;
+ futures.push_back( Future::spawnCommand( c->getShard() , dbName , shardedCommand ) );
+ }
+
+ BSONObjBuilder shardresults;
+ for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ){
+ shared_ptr<Future::CommandResult> res = *i;
+ if ( ! res->join() ){
+ errmsg = "mongod mr failed: ";
+ errmsg += res->result().toString();
+ return 0;
+ }
+ shardresults.append( res->getServer() , res->result() );
+ }
+
+ finalCmd.append( "shards" , shardresults.obj() );
+ timingBuilder.append( "shards" , t.millis() );
+
+ Timer t2;
+ ScopedDbConnection conn( conf->getPrimary() );
+ BSONObj finalResult;
+ if ( ! conn->runCommand( dbName , finalCmd.obj() , finalResult ) ){
+ errmsg = "final reduce failed: ";
+ errmsg += finalResult.toString();
+ return 0;
+ }
+ timingBuilder.append( "final" , t2.millis() );
+
+ result.appendElements( finalResult );
+ result.append( "timeMillis" , t.millis() );
+ result.append( "timing" , timingBuilder.obj() );
+
+ return 1;
+ }
+ } mrCmd;
+ }
+}
diff --git a/s/config.cpp b/s/config.cpp
new file mode 100644
index 0000000..0bfb5a3
--- /dev/null
+++ b/s/config.cpp
@@ -0,0 +1,535 @@
+// config.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "stdafx.h"
+#include "../util/message.h"
+#include "../util/unittest.h"
+#include "../client/connpool.h"
+#include "../client/model.h"
+#include "../db/pdfile.h"
+#include "../db/cmdline.h"
+
+#include "server.h"
+#include "config.h"
+#include "chunk.h"
+
+namespace mongo {
+
+ int ConfigServer::VERSION = 2;
+
+ /* --- DBConfig --- */
+
+ string DBConfig::modelServer() {
+ return configServer.modelServer();
+ }
+
+ bool DBConfig::isSharded( const string& ns ){
+ if ( ! _shardingEnabled )
+ return false;
+ return _sharded.find( ns ) != _sharded.end();
+ }
+
+ string DBConfig::getShard( const string& ns ){
+ if ( isSharded( ns ) )
+ return "";
+
+ uassert( 10178 , "no primary!" , _primary.size() );
+ return _primary;
+ }
+
+ void DBConfig::enableSharding(){
+ _shardingEnabled = true;
+ }
+
+ ChunkManager* DBConfig::shardCollection( const string& ns , ShardKeyPattern fieldsAndOrder , bool unique ){
+ if ( ! _shardingEnabled )
+ throw UserException( 8042 , "db doesn't have sharding enabled" );
+
+ ChunkManager * info = _shards[ns];
+ if ( info )
+ return info;
+
+ if ( isSharded( ns ) )
+ throw UserException( 8043 , "already sharded" );
+
+ log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder << endl;
+ _sharded[ns] = CollectionInfo( fieldsAndOrder , unique );
+
+ info = new ChunkManager( this , ns , fieldsAndOrder , unique );
+ _shards[ns] = info;
+ return info;
+
+ }
+
+ bool DBConfig::removeSharding( const string& ns ){
+ if ( ! _shardingEnabled ){
+ cout << "AAAA" << endl;
+ return false;
+ }
+
+ ChunkManager * info = _shards[ns];
+ map<string,CollectionInfo>::iterator i = _sharded.find( ns );
+
+ if ( info == 0 && i == _sharded.end() ){
+ cout << "BBBB" << endl;
+ return false;
+ }
+ uassert( 10179 , "_sharded but no info" , info );
+ uassert( 10180 , "info but no sharded" , i != _sharded.end() );
+
+ _sharded.erase( i );
+ _shards.erase( ns ); // TODO: clean this up, maybe switch to shared_ptr
+ return true;
+ }
+
+ ChunkManager* DBConfig::getChunkManager( const string& ns , bool reload ){
+ ChunkManager* m = _shards[ns];
+ if ( m && ! reload )
+ return m;
+
+ uassert( 10181 , (string)"not sharded:" + ns , isSharded( ns ) );
+ if ( m && reload )
+ log() << "reloading shard info for: " << ns << endl;
+ m = new ChunkManager( this , ns , _sharded[ ns ].key , _sharded[ns].unique );
+ _shards[ns] = m;
+ return m;
+ }
+
+ void DBConfig::serialize(BSONObjBuilder& to){
+ to.append("name", _name);
+ to.appendBool("partitioned", _shardingEnabled );
+ to.append("primary", _primary );
+
+ if ( _sharded.size() > 0 ){
+ BSONObjBuilder a;
+ for ( map<string,CollectionInfo>::reverse_iterator i=_sharded.rbegin(); i != _sharded.rend(); i++){
+ BSONObjBuilder temp;
+ temp.append( "key" , i->second.key.key() );
+ temp.appendBool( "unique" , i->second.unique );
+ a.append( i->first.c_str() , temp.obj() );
+ }
+ to.append( "sharded" , a.obj() );
+ }
+ }
+
+ void DBConfig::unserialize(const BSONObj& from){
+ _name = from.getStringField("name");
+ _shardingEnabled = from.getBoolField("partitioned");
+ _primary = from.getStringField("primary");
+
+ _sharded.clear();
+ BSONObj sharded = from.getObjectField( "sharded" );
+ if ( ! sharded.isEmpty() ){
+ BSONObjIterator i(sharded);
+ while ( i.more() ){
+ BSONElement e = i.next();
+ uassert( 10182 , "sharded things have to be objects" , e.type() == Object );
+ BSONObj c = e.embeddedObject();
+ uassert( 10183 , "key has to be an object" , c["key"].type() == Object );
+ _sharded[e.fieldName()] = CollectionInfo( c["key"].embeddedObject() ,
+ c["unique"].trueValue() );
+ }
+ }
+ }
+
+ void DBConfig::save( bool check ){
+ Model::save( check );
+ for ( map<string,ChunkManager*>::iterator i=_shards.begin(); i != _shards.end(); i++)
+ i->second->save();
+ }
+
+ bool DBConfig::reload(){
+ // TODO: i don't think is 100% correct
+ return doload();
+ }
+
+ bool DBConfig::doload(){
+ BSONObjBuilder b;
+ b.append("name", _name.c_str());
+ BSONObj q = b.done();
+ return load(q);
+ }
+
+ bool DBConfig::dropDatabase( string& errmsg ){
+ /**
+ * 1) make sure everything is up
+ * 2) update config server
+ * 3) drop and reset sharded collections
+ * 4) drop and reset primary
+ * 5) drop everywhere to clean up loose ends
+ */
+
+ log() << "DBConfig::dropDatabase: " << _name << endl;
+
+ // 1
+ if ( ! configServer.allUp( errmsg ) ){
+ log(1) << "\t DBConfig::dropDatabase not all up" << endl;
+ return 0;
+ }
+
+ // 2
+ grid.removeDB( _name );
+ remove( true );
+ if ( ! configServer.allUp( errmsg ) ){
+ log() << "error removing from config server even after checking!" << endl;
+ return 0;
+ }
+ log(1) << "\t removed entry from config server for: " << _name << endl;
+
+ set<string> allServers;
+
+ // 3
+ while ( true ){
+ int num;
+ if ( ! _dropShardedCollections( num , allServers , errmsg ) )
+ return 0;
+ log() << " DBConfig::dropDatabase: " << _name << " dropped sharded collections: " << num << endl;
+ if ( num == 0 )
+ break;
+ }
+
+ // 4
+ {
+ ScopedDbConnection conn( _primary );
+ BSONObj res;
+ if ( ! conn->dropDatabase( _name , &res ) ){
+ errmsg = res.toString();
+ return 0;
+ }
+ conn.done();
+ }
+
+ // 5
+ for ( set<string>::iterator i=allServers.begin(); i!=allServers.end(); i++ ){
+ string s = *i;
+ ScopedDbConnection conn( s );
+ BSONObj res;
+ if ( ! conn->dropDatabase( _name , &res ) ){
+ errmsg = res.toString();
+ return 0;
+ }
+ conn.done();
+ }
+
+ log(1) << "\t dropped primary db for: " << _name << endl;
+
+ return true;
+ }
+
+ bool DBConfig::_dropShardedCollections( int& num, set<string>& allServers , string& errmsg ){
+ num = 0;
+ set<string> seen;
+ while ( true ){
+ map<string,ChunkManager*>::iterator i = _shards.begin();
+
+ if ( i == _shards.end() )
+ break;
+
+ if ( seen.count( i->first ) ){
+ errmsg = "seen a collection twice!";
+ return false;
+ }
+
+ seen.insert( i->first );
+ log(1) << "\t dropping sharded collection: " << i->first << endl;
+
+ i->second->getAllServers( allServers );
+ i->second->drop();
+
+ num++;
+ uassert( 10184 , "_dropShardedCollections too many collections - bailing" , num < 100000 );
+ log(2) << "\t\t dropped " << num << " so far" << endl;
+ }
+ return true;
+ }
+
+ /* --- Grid --- */
+
+ string Grid::pickShardForNewDB(){
+ ScopedDbConnection conn( configServer.getPrimary() );
+
+ // TODO: this is temporary
+
+ vector<string> all;
+ auto_ptr<DBClientCursor> c = conn->query( "config.shards" , Query() );
+ while ( c->more() ){
+ BSONObj s = c->next();
+ all.push_back( s["host"].valuestrsafe() );
+ // look at s["maxSize"] if exists
+ }
+ conn.done();
+
+ if ( all.size() == 0 )
+ return "";
+
+ return all[ rand() % all.size() ];
+ }
+
+ bool Grid::knowAboutShard( string name ) const{
+ ScopedDbConnection conn( configServer.getPrimary() );
+ BSONObj shard = conn->findOne( "config.shards" , BSON( "host" << name ) );
+ conn.done();
+ return ! shard.isEmpty();
+ }
+
+ DBConfig* Grid::getDBConfig( string database , bool create ){
+ {
+ string::size_type i = database.find( "." );
+ if ( i != string::npos )
+ database = database.substr( 0 , i );
+ }
+
+ if ( database == "config" )
+ return &configServer;
+
+ boostlock l( _lock );
+
+ DBConfig*& cc = _databases[database];
+ if ( cc == 0 ){
+ cc = new DBConfig( database );
+ if ( ! cc->doload() ){
+ if ( create ){
+ // note here that cc->primary == 0.
+ log() << "couldn't find database [" << database << "] in config db" << endl;
+
+ if ( database == "admin" )
+ cc->_primary = configServer.getPrimary();
+ else
+ cc->_primary = pickShardForNewDB();
+
+ if ( cc->_primary.size() ){
+ cc->save();
+ log() << "\t put [" << database << "] on: " << cc->_primary << endl;
+ }
+ else {
+ log() << "\t can't find a shard to put new db on" << endl;
+ uassert( 10185 , "can't find a shard to put new db on" , 0 );
+ }
+ }
+ else {
+ cc = 0;
+ }
+ }
+
+ }
+
+ return cc;
+ }
+
+ void Grid::removeDB( string database ){
+ uassert( 10186 , "removeDB expects db name" , database.find( '.' ) == string::npos );
+ boostlock l( _lock );
+ _databases.erase( database );
+
+ }
+
+ unsigned long long Grid::getNextOpTime() const {
+ ScopedDbConnection conn( configServer.getPrimary() );
+
+ BSONObj result;
+ massert( 10421 , "getoptime failed" , conn->simpleCommand( "admin" , &result , "getoptime" ) );
+ conn.done();
+
+ return result["optime"]._numberLong();
+ }
+
+ /* --- ConfigServer ---- */
+
+ ConfigServer::ConfigServer() {
+ _shardingEnabled = false;
+ _primary = "";
+ _name = "grid";
+ }
+
+ ConfigServer::~ConfigServer() {
+ }
+
+ bool ConfigServer::init( vector<string> configHosts ){
+ uassert( 10187 , "need configdbs" , configHosts.size() );
+
+ string hn = getHostName();
+ if ( hn.empty() ) {
+ sleepsecs(5);
+ dbexit( EXIT_BADOPTIONS );
+ }
+ ourHostname = hn;
+
+ set<string> hosts;
+ for ( size_t i=0; i<configHosts.size(); i++ ){
+ string host = configHosts[i];
+ hosts.insert( getHost( host , false ) );
+ configHosts[i] = getHost( host , true );
+ }
+
+ for ( set<string>::iterator i=hosts.begin(); i!=hosts.end(); i++ ){
+ string host = *i;
+ bool ok = false;
+ for ( int x=0; x<10; x++ ){
+ if ( ! hostbyname( host.c_str() ).empty() ){
+ ok = true;
+ break;
+ }
+ log() << "can't resolve DNS for [" << host << "] sleeping and trying " << (10-x) << " more times" << endl;
+ sleepsecs( 10 );
+ }
+ if ( ! ok )
+ return false;
+ }
+
+ uassert( 10188 , "can only hand 1 config db right now" , configHosts.size() == 1 );
+ _primary = configHosts[0];
+
+ return true;
+ }
+
+ bool ConfigServer::allUp(){
+ string errmsg;
+ return allUp( errmsg );
+ }
+
+ bool ConfigServer::allUp( string& errmsg ){
+ try {
+ ScopedDbConnection conn( _primary );
+ conn->getLastError();
+ conn.done();
+ return true;
+ }
+ catch ( DBException& ){
+ log() << "ConfigServer::allUp : " << _primary << " seems down!" << endl;
+ errmsg = _primary + " seems down";
+ return false;
+ }
+
+ }
+
+ int ConfigServer::dbConfigVersion(){
+ ScopedDbConnection conn( _primary );
+ int version = dbConfigVersion( conn.conn() );
+ conn.done();
+ return version;
+ }
+
+ int ConfigServer::dbConfigVersion( DBClientBase& conn ){
+ auto_ptr<DBClientCursor> c = conn.query( "config.version" , BSONObj() );
+ int version = 0;
+ if ( c->more() ){
+ BSONObj o = c->next();
+ version = o["version"].numberInt();
+ uassert( 10189 , "should only have 1 thing in config.version" , ! c->more() );
+ }
+ else {
+ if ( conn.count( "config.shard" ) || conn.count( "config.databases" ) ){
+ version = 1;
+ }
+ }
+
+ return version;
+ }
+
+ int ConfigServer::checkConfigVersion(){
+ int cur = dbConfigVersion();
+ if ( cur == VERSION )
+ return 0;
+
+ if ( cur == 0 ){
+ ScopedDbConnection conn( _primary );
+ conn->insert( "config.version" , BSON( "version" << VERSION ) );
+ pool.flush();
+ assert( VERSION == dbConfigVersion( conn.conn() ) );
+ conn.done();
+ return 0;
+ }
+
+ log() << "don't know how to upgrade " << cur << " to " << VERSION << endl;
+ return -8;
+ }
+
+ string ConfigServer::getHost( string name , bool withPort ){
+ if ( name.find( ":" ) ){
+ if ( withPort )
+ return name;
+ return name.substr( 0 , name.find( ":" ) );
+ }
+
+ if ( withPort ){
+ stringstream ss;
+ ss << name << ":" << CmdLine::ConfigServerPort;
+ return ss.str();
+ }
+
+ return name;
+ }
+
+ ConfigServer configServer;
+ Grid grid;
+
+
+ class DBConfigUnitTest : public UnitTest {
+ public:
+ void testInOut( DBConfig& c , BSONObj o ){
+ c.unserialize( o );
+ BSONObjBuilder b;
+ c.serialize( b );
+
+ BSONObj out = b.obj();
+
+ if ( o.toString() == out.toString() )
+ return;
+
+ log() << "DBConfig serialization broken\n"
+ << "in : " << o.toString() << "\n"
+ << "out : " << out.toString()
+ << endl;
+ assert(0);
+ }
+
+ void a(){
+ BSONObjBuilder b;
+ b << "name" << "abc";
+ b.appendBool( "partitioned" , true );
+ b << "primary" << "myserver";
+
+ DBConfig c;
+ testInOut( c , b.obj() );
+ }
+
+ void b(){
+ BSONObjBuilder b;
+ b << "name" << "abc";
+ b.appendBool( "partitioned" , true );
+ b << "primary" << "myserver";
+
+ BSONObjBuilder a;
+ a << "abc.foo" << fromjson( "{ 'key' : { 'a' : 1 } , 'unique' : false }" );
+ a << "abc.bar" << fromjson( "{ 'key' : { 'kb' : -1 } , 'unique' : true }" );
+
+ b.appendArray( "sharded" , a.obj() );
+
+ DBConfig c;
+ testInOut( c , b.obj() );
+ assert( c.isSharded( "abc.foo" ) );
+ assert( ! c.isSharded( "abc.food" ) );
+ }
+
+ void run(){
+ a();
+ b();
+ }
+
+ } dbConfigUnitTest;
+}
diff --git a/s/config.h b/s/config.h
new file mode 100644
index 0000000..16aa67a
--- /dev/null
+++ b/s/config.h
@@ -0,0 +1,195 @@
+// config.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+/* This file is things related to the "grid configuration":
+ - what machines make up the db component of our cloud
+ - where various ranges of things live
+*/
+
+#pragma once
+
+#include "../db/namespace.h"
+#include "../client/dbclient.h"
+#include "../client/model.h"
+#include "shardkey.h"
+
+namespace mongo {
+
+ class Grid;
+ class ConfigServer;
+
+ extern ConfigServer configServer;
+ extern Grid grid;
+
+ class ChunkManager;
+
+ class CollectionInfo {
+ public:
+ CollectionInfo( ShardKeyPattern _key = BSONObj() , bool _unique = false ) :
+ key( _key ) , unique( _unique ){}
+
+ ShardKeyPattern key;
+ bool unique;
+ };
+
+ /**
+ * top level grid configuration for an entire database
+ * TODO: use shared_ptr for ChunkManager
+ */
+ class DBConfig : public Model {
+ public:
+ DBConfig( string name = "" ) : _name( name ) , _primary("") , _shardingEnabled(false){ }
+
+ string getName(){ return _name; };
+
+ /**
+ * @return if anything in this db is partitioned or not
+ */
+ bool isShardingEnabled(){
+ return _shardingEnabled;
+ }
+
+ void enableSharding();
+ ChunkManager* shardCollection( const string& ns , ShardKeyPattern fieldsAndOrder , bool unique );
+
+ /**
+ * @return whether or not this partition is partitioned
+ */
+ bool isSharded( const string& ns );
+
+ ChunkManager* getChunkManager( const string& ns , bool reload = false );
+
+ /**
+ * @return the correct for shard for the ns
+ * if the namespace is sharded, will return an empty string
+ */
+ string getShard( const string& ns );
+
+ string getPrimary(){
+ if ( _primary.size() == 0 )
+ throw UserException( 8041 , (string)"no primary shard configured for db: " + _name );
+ return _primary;
+ }
+
+ void setPrimary( string s ){
+ _primary = s;
+ }
+
+ bool reload();
+
+ bool dropDatabase( string& errmsg );
+
+ virtual void save( bool check=true);
+
+ virtual string modelServer();
+
+ // model stuff
+
+ virtual const char * getNS(){ return "config.databases"; }
+ virtual void serialize(BSONObjBuilder& to);
+ virtual void unserialize(const BSONObj& from);
+
+ protected:
+
+ bool _dropShardedCollections( int& num, set<string>& allServers , string& errmsg );
+
+ bool doload();
+
+ /**
+ @return true if there was sharding info to remove
+ */
+ bool removeSharding( const string& ns );
+
+ string _name; // e.g. "alleyinsider"
+ string _primary; // e.g. localhost , mongo.foo.com:9999
+ bool _shardingEnabled;
+
+ map<string,CollectionInfo> _sharded; // { "alleyinsider.blog.posts" : { ts : 1 } , ... ] - all ns that are sharded
+ map<string,ChunkManager*> _shards; // this will only have entries for things that have been looked at
+
+ friend class Grid;
+ friend class ChunkManager;
+ };
+
+ /**
+ * stores meta-information about the grid
+ * TODO: used shard_ptr for DBConfig pointers
+ */
+ class Grid {
+ public:
+ /**
+ gets the config the db.
+ will return an empty DBConfig if not in db already
+ */
+ DBConfig * getDBConfig( string ns , bool create=true);
+
+ /**
+ * removes db entry.
+ * on next getDBConfig call will fetch from db
+ */
+ void removeDB( string db );
+
+ string pickShardForNewDB();
+
+ bool knowAboutShard( string name ) const;
+
+ unsigned long long getNextOpTime() const;
+ private:
+ map<string,DBConfig*> _databases;
+ boost::mutex _lock; // TODO: change to r/w lock
+ };
+
+ class ConfigServer : public DBConfig {
+ public:
+
+ ConfigServer();
+ ~ConfigServer();
+
+ bool ok(){
+ // TODO: check can connect
+ return _primary.size() > 0;
+ }
+
+ virtual string modelServer(){
+ uassert( 10190 , "ConfigServer not setup" , _primary.size() );
+ return _primary;
+ }
+
+ /**
+ call at startup, this will initiate connection to the grid db
+ */
+ bool init( vector<string> configHosts );
+
+ bool allUp();
+ bool allUp( string& errmsg );
+
+ int dbConfigVersion();
+ int dbConfigVersion( DBClientBase& conn );
+
+ /**
+ * @return 0 = ok, otherwise error #
+ */
+ int checkConfigVersion();
+
+ static int VERSION;
+
+ private:
+ string getHost( string name , bool withPort );
+ };
+
+} // namespace mongo
diff --git a/s/cursors.cpp b/s/cursors.cpp
new file mode 100644
index 0000000..23b8eaf
--- /dev/null
+++ b/s/cursors.cpp
@@ -0,0 +1,104 @@
+// cursors.cpp
+
+#include "stdafx.h"
+#include "cursors.h"
+#include "../client/connpool.h"
+#include "../db/queryutil.h"
+
+namespace mongo {
+
+ // -------- ShardedCursor -----------
+
+ ShardedClientCursor::ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ){
+ assert( cursor );
+ _cursor = cursor;
+
+ _skip = q.ntoskip;
+ _ntoreturn = q.ntoreturn;
+
+ _totalSent = 0;
+ _done = false;
+
+ do {
+ // TODO: only create _id when needed
+ _id = security.getNonce();
+ } while ( _id == 0 );
+
+ }
+
+ ShardedClientCursor::~ShardedClientCursor(){
+ assert( _cursor );
+ delete _cursor;
+ _cursor = 0;
+ }
+
+ bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ){
+ uassert( 10191 , "cursor already done" , ! _done );
+
+ int maxSize = 1024 * 1024;
+ if ( _totalSent > 0 )
+ maxSize *= 3;
+
+ BufBuilder b(32768);
+
+ int num = 0;
+ bool sendMore = true;
+
+ while ( _cursor->more() ){
+ BSONObj o = _cursor->next();
+
+ b.append( (void*)o.objdata() , o.objsize() );
+ num++;
+
+ if ( b.len() > maxSize ){
+ break;
+ }
+
+ if ( num == ntoreturn ){
+ // soft limit aka batch size
+ break;
+ }
+
+ if ( ntoreturn != 0 && ( -1 * num + _totalSent ) == ntoreturn ){
+ // hard limit - total to send
+ sendMore = false;
+ break;
+ }
+ }
+
+ bool hasMore = sendMore && _cursor->more();
+ log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << _id << " totalSent: " << _totalSent << endl;
+
+ replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? _id : 0 );
+ _totalSent += num;
+ _done = ! hasMore;
+
+ return hasMore;
+ }
+
+
+ CursorCache::CursorCache(){
+ }
+
+ CursorCache::~CursorCache(){
+ // TODO: delete old cursors?
+ }
+
+ ShardedClientCursor* CursorCache::get( long long id ){
+ map<long long,ShardedClientCursor*>::iterator i = _cursors.find( id );
+ if ( i == _cursors.end() ){
+ OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl;
+ return 0;
+ }
+ return i->second;
+ }
+
+ void CursorCache::store( ShardedClientCursor * cursor ){
+ _cursors[cursor->getId()] = cursor;
+ }
+ void CursorCache::remove( long long id ){
+ _cursors.erase( id );
+ }
+
+ CursorCache cursorCache;
+}
diff --git a/s/cursors.h b/s/cursors.h
new file mode 100644
index 0000000..b1ed4b0
--- /dev/null
+++ b/s/cursors.h
@@ -0,0 +1,56 @@
+// cursors.h
+
+#pragma once
+
+#include "../stdafx.h"
+
+#include "../db/jsobj.h"
+#include "../db/dbmessage.h"
+#include "../client/dbclient.h"
+#include "../client/parallel.h"
+
+#include "request.h"
+
+namespace mongo {
+
+ class ShardedClientCursor {
+ public:
+ ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor );
+ virtual ~ShardedClientCursor();
+
+ long long getId(){ return _id; }
+
+ /**
+ * @return whether there is more data left
+ */
+ bool sendNextBatch( Request& r ){ return sendNextBatch( r , _ntoreturn ); }
+ bool sendNextBatch( Request& r , int ntoreturn );
+
+ protected:
+
+ ClusteredCursor * _cursor;
+
+ int _skip;
+ int _ntoreturn;
+
+ int _totalSent;
+ bool _done;
+
+ long long _id;
+ };
+
+ class CursorCache {
+ public:
+ CursorCache();
+ ~CursorCache();
+
+ ShardedClientCursor * get( long long id );
+ void store( ShardedClientCursor* cursor );
+ void remove( long long id );
+
+ private:
+ map<long long,ShardedClientCursor*> _cursors;
+ };
+
+ extern CursorCache cursorCache;
+}
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
new file mode 100644
index 0000000..cc627eb
--- /dev/null
+++ b/s/d_logic.cpp
@@ -0,0 +1,542 @@
+// d_logic.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+
+/**
+ these are commands that live in mongod
+ mostly around shard management and checking
+ */
+
+#include "stdafx.h"
+#include <map>
+#include <string>
+
+#include "../db/commands.h"
+#include "../db/jsobj.h"
+#include "../db/dbmessage.h"
+
+#include "../client/connpool.h"
+
+#include "../util/queue.h"
+
+using namespace std;
+
+namespace mongo {
+
+ typedef map<string,unsigned long long> NSVersions;
+
+ NSVersions globalVersions;
+ boost::thread_specific_ptr<NSVersions> clientShardVersions;
+
+ string shardConfigServer;
+
+ boost::thread_specific_ptr<OID> clientServerIds;
+ map< string , BlockingQueue<BSONObj>* > clientQueues;
+
+ unsigned long long getVersion( BSONElement e , string& errmsg ){
+ if ( e.eoo() ){
+ errmsg = "no version";
+ return 0;
+ }
+
+ if ( e.isNumber() )
+ return (unsigned long long)e.number();
+
+ if ( e.type() == Date || e.type() == Timestamp )
+ return e._numberLong();
+
+
+ errmsg = "version is not a numberic type";
+ return 0;
+ }
+
+ class MongodShardCommand : public Command {
+ public:
+ MongodShardCommand( const char * n ) : Command( n ){
+ }
+ virtual bool slaveOk(){
+ return false;
+ }
+ virtual bool adminOnly() {
+ return true;
+ }
+ };
+
+ class WriteBackCommand : public MongodShardCommand {
+ public:
+ WriteBackCommand() : MongodShardCommand( "writebacklisten" ){}
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ BSONElement e = cmdObj.firstElement();
+ if ( e.type() != jstOID ){
+ errmsg = "need oid as first value";
+ return 0;
+ }
+
+ const OID id = e.__oid();
+
+ dbtemprelease unlock;
+
+ if ( ! clientQueues[id.str()] )
+ clientQueues[id.str()] = new BlockingQueue<BSONObj>();
+
+ BSONObj z = clientQueues[id.str()]->blockingPop();
+ log(1) << "WriteBackCommand got : " << z << endl;
+
+ result.append( "data" , z );
+
+ return true;
+ }
+ } writeBackCommand;
+
+ // setShardVersion( ns )
+
+ class SetShardVersion : public MongodShardCommand {
+ public:
+ SetShardVersion() : MongodShardCommand("setShardVersion"){}
+
+ virtual void help( stringstream& help ) const {
+ help << " example: { setShardVersion : 'alleyinsider.foo' , version : 1 , configdb : '' } ";
+ }
+
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ bool authoritative = cmdObj.getBoolField( "authoritative" );
+
+ string configdb = cmdObj["configdb"].valuestrsafe();
+ { // configdb checking
+ if ( configdb.size() == 0 ){
+ errmsg = "no configdb";
+ return false;
+ }
+
+ if ( shardConfigServer.size() == 0 ){
+ if ( ! authoritative ){
+ result.appendBool( "need_authoritative" , true );
+ errmsg = "first setShardVersion";
+ return false;
+ }
+ shardConfigServer = configdb;
+ }
+ else if ( shardConfigServer != configdb ){
+ errmsg = "specified a different configdb!";
+ return false;
+ }
+ }
+
+ { // setting up ids
+ if ( cmdObj["serverID"].type() != jstOID ){
+ // TODO: fix this
+ //errmsg = "need serverID to be an OID";
+ //return 0;
+ }
+ else {
+ OID clientId = cmdObj["serverID"].__oid();
+ if ( ! clientServerIds.get() ){
+ string s = clientId.str();
+
+ OID * nid = new OID();
+ nid->init( s );
+ clientServerIds.reset( nid );
+
+ if ( ! clientQueues[s] )
+ clientQueues[s] = new BlockingQueue<BSONObj>();
+ }
+ else if ( clientId != *clientServerIds.get() ){
+ errmsg = "server id has changed!";
+ return 0;
+ }
+ }
+ }
+
+ unsigned long long version = getVersion( cmdObj["version"] , errmsg );
+ if ( errmsg.size() ){
+ return false;
+ }
+
+ NSVersions * versions = clientShardVersions.get();
+
+ if ( ! versions ){
+ log(1) << "entering shard mode for connection" << endl;
+ versions = new NSVersions();
+ clientShardVersions.reset( versions );
+ }
+
+ string ns = cmdObj["setShardVersion"].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "need to speciy fully namespace";
+ return false;
+ }
+
+ unsigned long long& oldVersion = (*versions)[ns];
+ unsigned long long& globalVersion = globalVersions[ns];
+
+ if ( version == 0 && globalVersion == 0 ){
+ // this connection is cleaning itself
+ oldVersion = 0;
+ return 1;
+ }
+
+ if ( version == 0 && globalVersion > 0 ){
+ if ( ! authoritative ){
+ result.appendBool( "need_authoritative" , true );
+ result.appendTimestamp( "globalVersion" , globalVersion );
+ result.appendTimestamp( "oldVersion" , oldVersion );
+ errmsg = "dropping needs to be authoritative";
+ return 0;
+ }
+ log() << "wiping data for: " << ns << endl;
+ result.appendTimestamp( "beforeDrop" , globalVersion );
+ // only setting global version on purpose
+ // need clients to re-find meta-data
+ globalVersion = 0;
+ oldVersion = 0;
+ return 1;
+ }
+
+ if ( version < oldVersion ){
+ errmsg = "you already have a newer version";
+ result.appendTimestamp( "oldVersion" , oldVersion );
+ result.appendTimestamp( "newVersion" , version );
+ return false;
+ }
+
+ if ( version < globalVersion ){
+ errmsg = "going to older version for global";
+ return false;
+ }
+
+ if ( globalVersion == 0 && ! cmdObj.getBoolField( "authoritative" ) ){
+ // need authoritative for first look
+ result.appendBool( "need_authoritative" , true );
+ result.append( "ns" , ns );
+ errmsg = "first time for this ns";
+ return false;
+ }
+
+ result.appendTimestamp( "oldVersion" , oldVersion );
+ oldVersion = version;
+ globalVersion = version;
+
+ result.append( "ok" , 1 );
+ return 1;
+ }
+
+ } setShardVersion;
+
+ class GetShardVersion : public MongodShardCommand {
+ public:
+ GetShardVersion() : MongodShardCommand("getShardVersion"){}
+
+ virtual void help( stringstream& help ) const {
+ help << " example: { getShardVersion : 'alleyinsider.foo' } ";
+ }
+
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ string ns = cmdObj["getShardVersion"].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "need to speciy fully namespace";
+ return false;
+ }
+
+ result.append( "configServer" , shardConfigServer.c_str() );
+
+ result.appendTimestamp( "global" , globalVersions[ns] );
+ if ( clientShardVersions.get() )
+ result.appendTimestamp( "mine" , (*clientShardVersions.get())[ns] );
+ else
+ result.appendTimestamp( "mine" , 0 );
+
+ return true;
+ }
+
+ } getShardVersion;
+
+ class MoveShardStartCommand : public MongodShardCommand {
+ public:
+ MoveShardStartCommand() : MongodShardCommand( "movechunk.start" ){}
+ virtual void help( stringstream& help ) const {
+ help << "should not be calling this directly" << endl;
+ }
+
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ // so i have to start clone, tell caller its ok to make change
+ // at this point the caller locks me, and updates config db
+ // then finish calls finish, and then deletes data when cursors are done
+
+ string ns = cmdObj["movechunk.start"].valuestrsafe();
+ string to = cmdObj["to"].valuestrsafe();
+ string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe
+ BSONObj filter = cmdObj.getObjectField( "filter" );
+
+ if ( ns.size() == 0 ){
+ errmsg = "need to specify namespace in command";
+ return false;
+ }
+
+ if ( to.size() == 0 ){
+ errmsg = "need to specify server to move shard to";
+ return false;
+ }
+ if ( from.size() == 0 ){
+ errmsg = "need to specify server to move shard from (redundat i know)";
+ return false;
+ }
+
+ if ( filter.isEmpty() ){
+ errmsg = "need to specify a filter";
+ return false;
+ }
+
+ log() << "got movechunk.start: " << cmdObj << endl;
+
+
+ BSONObj res;
+ bool ok;
+
+ {
+ dbtemprelease unlock;
+
+ ScopedDbConnection conn( to );
+ ok = conn->runCommand( "admin" ,
+ BSON( "startCloneCollection" << ns <<
+ "from" << from <<
+ "query" << filter
+ ) ,
+ res );
+ conn.done();
+ }
+
+ log() << " movechunk.start res: " << res << endl;
+
+ if ( ok ){
+ result.append( res["finishToken"] );
+ }
+ else {
+ errmsg = "startCloneCollection failed: ";
+ errmsg += res["errmsg"].valuestrsafe();
+ }
+ return ok;
+ }
+
+ } moveShardStartCmd;
+
+ class MoveShardFinishCommand : public MongodShardCommand {
+ public:
+ MoveShardFinishCommand() : MongodShardCommand( "movechunk.finish" ){}
+ virtual void help( stringstream& help ) const {
+ help << "should not be calling this directly" << endl;
+ }
+
+ bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ // see MoveShardStartCommand::run
+
+ string ns = cmdObj["movechunk.finish"].valuestrsafe();
+ if ( ns.size() == 0 ){
+ errmsg = "need ns as cmd value";
+ return false;
+ }
+
+ string to = cmdObj["to"].valuestrsafe();
+ if ( to.size() == 0 ){
+ errmsg = "need to specify server to move shard to";
+ return false;
+ }
+
+
+ unsigned long long newVersion = getVersion( cmdObj["newVersion"] , errmsg );
+ if ( newVersion == 0 ){
+ errmsg = "have to specify new version number";
+ return false;
+ }
+
+ BSONObj finishToken = cmdObj.getObjectField( "finishToken" );
+ if ( finishToken.isEmpty() ){
+ errmsg = "need finishToken";
+ return false;
+ }
+
+ if ( ns != finishToken["collection"].valuestrsafe() ){
+ errmsg = "namespaced don't match";
+ return false;
+ }
+
+ // now we're locked
+ globalVersions[ns] = newVersion;
+ NSVersions * versions = clientShardVersions.get();
+ if ( ! versions ){
+ versions = new NSVersions();
+ clientShardVersions.reset( versions );
+ }
+ (*versions)[ns] = newVersion;
+
+ BSONObj res;
+ bool ok;
+
+ {
+ dbtemprelease unlock;
+
+ ScopedDbConnection conn( to );
+ ok = conn->runCommand( "admin" ,
+ BSON( "finishCloneCollection" << finishToken ) ,
+ res );
+ conn.done();
+ }
+
+ if ( ! ok ){
+ // uh oh
+ errmsg = "finishCloneCollection failed!";
+ result << "finishError" << res;
+ return false;
+ }
+
+ // wait until cursors are clean
+ cout << "WARNING: deleting data before ensuring no more cursors TODO" << endl;
+
+ dbtemprelease unlock;
+
+ DBDirectClient client;
+ BSONObj removeFilter = finishToken.getObjectField( "query" );
+ client.remove( ns , removeFilter );
+
+ return true;
+ }
+
+ } moveShardFinishCmd;
+
+ bool haveLocalShardingInfo( const string& ns ){
+ if ( shardConfigServer.empty() )
+ return false;
+
+
+ unsigned long long version = globalVersions[ns];
+ if ( version == 0 )
+ return false;
+
+ NSVersions * versions = clientShardVersions.get();
+ if ( ! versions )
+ return false;
+
+ return true;
+ }
+
+ /**
+ * @ return true if not in sharded mode
+ or if version for this client is ok
+ */
+ bool shardVersionOk( const string& ns , string& errmsg ){
+ if ( shardConfigServer.empty() ){
+ return true;
+ }
+
+ NSVersions::iterator i = globalVersions.find( ns );
+ if ( i == globalVersions.end() )
+ return true;
+
+ NSVersions * versions = clientShardVersions.get();
+ if ( ! versions ){
+ // this means the client has nothing sharded
+ // so this allows direct connections to do whatever they want
+ // which i think is the correct behavior
+ return true;
+ }
+
+ unsigned long long clientVersion = (*versions)[ns];
+ unsigned long long version = i->second;
+
+ if ( version == 0 && clientVersion > 0 ){
+ stringstream ss;
+ ss << "version: " << version << " clientVersion: " << clientVersion;
+ errmsg = ss.str();
+ return false;
+ }
+
+ if ( clientVersion >= version )
+ return true;
+
+
+ if ( clientVersion == 0 ){
+ errmsg = "client in sharded mode, but doesn't have version set for this collection";
+ return false;
+ }
+
+ errmsg = (string)"your version is too old ns: " + ns;
+ return false;
+ }
+
+
+ bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ){
+
+ if ( shardConfigServer.empty() ){
+ return false;
+ }
+
+ int op = m.data->operation();
+ if ( op < 2000 || op >= 3000 )
+ return false;
+
+
+ const char *ns = m.data->_data + 4;
+ string errmsg;
+ if ( shardVersionOk( ns , errmsg ) ){
+ return false;
+ }
+
+ log() << "shardVersionOk failed ns:" << ns << " " << errmsg << endl;
+
+ if ( doesOpGetAResponse( op ) ){
+ BufBuilder b( 32768 );
+ b.skip( sizeof( QueryResult ) );
+ {
+ BSONObj obj = BSON( "$err" << errmsg );
+ b.append( obj.objdata() , obj.objsize() );
+ }
+
+ QueryResult *qr = (QueryResult*)b.buf();
+ qr->_resultFlags() = QueryResult::ResultFlag_ErrSet | QueryResult::ResultFlag_ShardConfigStale;
+ qr->len = b.len();
+ qr->setOperation( opReply );
+ qr->cursorId = 0;
+ qr->startingFrom = 0;
+ qr->nReturned = 1;
+ b.decouple();
+
+ Message * resp = new Message();
+ resp->setData( qr , true );
+
+ dbresponse.response = resp;
+ dbresponse.responseTo = m.data->id;
+ return true;
+ }
+
+ OID * clientID = clientServerIds.get();
+ massert( 10422 , "write with bad shard config and no server id!" , clientID );
+
+ log() << "got write with an old config - writing back" << endl;
+
+ BSONObjBuilder b;
+ b.appendBool( "writeBack" , true );
+ b.append( "ns" , ns );
+ b.appendBinData( "msg" , m.data->len , bdtCustom , (char*)(m.data) );
+ log() << "writing back msg with len: " << m.data->len << " op: " << m.data->_operation << endl;
+ clientQueues[clientID->str()]->push( b.obj() );
+
+ return true;
+ }
+
+}
diff --git a/s/d_logic.h b/s/d_logic.h
new file mode 100644
index 0000000..3e483c4
--- /dev/null
+++ b/s/d_logic.h
@@ -0,0 +1,23 @@
+// d_logic.h
+
+#pragma once
+
+#include "../stdafx.h"
+
+namespace mongo {
+
+ /**
+ * @return true if we have any shard info for the ns
+ */
+ bool haveLocalShardingInfo( const string& ns );
+
+ /**
+ * @return true if the current threads shard version is ok, or not in sharded version
+ */
+ bool shardVersionOk( const string& ns , string& errmsg );
+
+ /**
+ * @return true if we took care of the message and nothing else should be done
+ */
+ bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse );
+}
diff --git a/s/d_util.cpp b/s/d_util.cpp
new file mode 100644
index 0000000..8c30d2e
--- /dev/null
+++ b/s/d_util.cpp
@@ -0,0 +1,36 @@
+// util.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+
+/**
+ these are commands that live in mongod
+ mostly around shard management and checking
+ */
+
+#include "stdafx.h"
+#include "util.h"
+
+using namespace std;
+
+namespace mongo {
+
+ void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative ){
+ // no-op in mongod
+ }
+
+}
diff --git a/s/dbgrid.vcproj b/s/dbgrid.vcproj
new file mode 100644
index 0000000..2c8ef85
--- /dev/null
+++ b/s/dbgrid.vcproj
@@ -0,0 +1,660 @@
+<?xml version="1.0" encoding="Windows-1252"?>
+<VisualStudioProject
+ ProjectType="Visual C++"
+ Version="9.00"
+ Name="mongos"
+ ProjectGUID="{E03717ED-69B4-4D21-BC55-DF6690B585C6}"
+ RootNamespace="dbgrid"
+ Keyword="Win32Proj"
+ TargetFrameworkVersion="196613"
+ >
+ <Platforms>
+ <Platform
+ Name="Win32"
+ />
+ </Platforms>
+ <ToolFiles>
+ </ToolFiles>
+ <Configurations>
+ <Configuration
+ Name="Debug|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="1"
+ CharacterSet="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="0"
+ AdditionalIncludeDirectories="&quot;..\pcre-7.4&quot;;&quot;C:\Program Files\boost\boost_1_35_0&quot;"
+ PreprocessorDefinitions="USE_ASIO;WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H;PCRE_STATIC"
+ MinimalRebuild="true"
+ BasicRuntimeChecks="3"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="2"
+ PrecompiledHeaderThrough="stdafx.h"
+ WarningLevel="3"
+ DebugInformationFormat="4"
+ DisableSpecificWarnings="4355;4800"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="ws2_32.lib"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories="&quot;c:\program files\boost\boost_1_35_0\lib&quot;"
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Release|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="1"
+ CharacterSet="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="2"
+ EnableIntrinsicFunctions="true"
+ AdditionalIncludeDirectories="&quot;..\pcre-7.4&quot;;&quot;c:\Program Files\boost\boost_1_35_0\&quot;"
+ PreprocessorDefinitions="USE_ASIO;WIN32;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;PCRE_STATIC"
+ RuntimeLibrary="2"
+ EnableFunctionLevelLinking="true"
+ UsePrecompiledHeader="2"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ DisableSpecificWarnings="4355;4800"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="ws2_32.lib"
+ LinkIncremental="1"
+ AdditionalLibraryDirectories="&quot;c:\Program Files\boost\boost_1_35_0\lib&quot;"
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ OptimizeReferences="2"
+ EnableCOMDATFolding="2"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="release_nojni|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="1"
+ CharacterSet="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="2"
+ EnableIntrinsicFunctions="true"
+ AdditionalIncludeDirectories="&quot;..\pcre-7.4&quot;;&quot;c:\Program Files\boost\boost_1_35_0\&quot;"
+ PreprocessorDefinitions="WIN32;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;PCRE_STATIC"
+ RuntimeLibrary="2"
+ EnableFunctionLevelLinking="true"
+ UsePrecompiledHeader="2"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ DisableSpecificWarnings="4355"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="ws2_32.lib"
+ LinkIncremental="1"
+ AdditionalLibraryDirectories="&quot;c:\Program Files\boost\boost_1_35_0\lib&quot;"
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ OptimizeReferences="2"
+ EnableCOMDATFolding="2"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Debug Recstore|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="1"
+ CharacterSet="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="0"
+ AdditionalIncludeDirectories="&quot;..\pcre-7.4&quot;;&quot;C:\Program Files\boost\boost_1_35_0&quot;"
+ PreprocessorDefinitions="USE_ASIO;WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H;PCRE_STATIC"
+ MinimalRebuild="true"
+ BasicRuntimeChecks="3"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="2"
+ PrecompiledHeaderThrough="stdafx.h"
+ WarningLevel="3"
+ DebugInformationFormat="4"
+ DisableSpecificWarnings="4355;4800"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="ws2_32.lib"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories="&quot;c:\program files\boost\boost_1_35_0\lib&quot;"
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ </Configurations>
+ <References>
+ </References>
+ <Files>
+ <Filter
+ Name="Source Files"
+ Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
+ UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
+ >
+ <File
+ RelativePath=".\chunk.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\commands_admin.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\commands_public.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\config.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\cursors.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\queryutil.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\request.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\server.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\shardkey.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\stdafx.cpp"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="1"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="1"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="release_nojni|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="1"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Debug Recstore|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="1"
+ />
+ </FileConfiguration>
+ </File>
+ <File
+ RelativePath=".\strategy.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\strategy_shard.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\strategy_single.cpp"
+ >
+ </File>
+ <Filter
+ Name="Shared Source Files"
+ >
+ <File
+ RelativePath="..\util\assert_util.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\background.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\base64.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\commands.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\debug_util.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\jsobj.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\json.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\lasterror.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\md5.c"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Debug Recstore|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ </File>
+ <File
+ RelativePath="..\util\md5main.cpp"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="2"
+ />
+ </FileConfiguration>
+ </File>
+ <File
+ RelativePath="..\util\message.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\message_server_asio.cpp"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="release_nojni|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Debug Recstore|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ </File>
+ <File
+ RelativePath="..\db\nonce.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\client\parallel.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\sock.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\thread_pool.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\util.cpp"
+ >
+ </File>
+ </Filter>
+ </Filter>
+ <Filter
+ Name="Header Files"
+ Filter="h;hpp;hxx;hm;inl;inc;xsd"
+ UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"
+ >
+ <File
+ RelativePath=".\gridconfig.h"
+ >
+ </File>
+ <File
+ RelativePath=".\griddatabase.h"
+ >
+ </File>
+ <File
+ RelativePath=".\shard.h"
+ >
+ </File>
+ <File
+ RelativePath=".\strategy.h"
+ >
+ </File>
+ <Filter
+ Name="Header Shared"
+ >
+ <File
+ RelativePath="..\util\background.h"
+ >
+ </File>
+ <File
+ RelativePath="..\db\commands.h"
+ >
+ </File>
+ <File
+ RelativePath="..\client\connpool.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\dbmessage.h"
+ >
+ </File>
+ <File
+ RelativePath="..\util\goodies.h"
+ >
+ </File>
+ <File
+ RelativePath="..\db\jsobj.h"
+ >
+ </File>
+ <File
+ RelativePath="..\db\json.h"
+ >
+ </File>
+ <File
+ RelativePath="..\stdafx.h"
+ >
+ </File>
+ </Filter>
+ </Filter>
+ <Filter
+ Name="Resource Files"
+ Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav"
+ UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}"
+ >
+ </Filter>
+ <Filter
+ Name="libs_etc"
+ >
+ <File
+ RelativePath="..\..\boostw\boost_1_34_1\boost\config\auto_link.hpp"
+ >
+ </File>
+ <File
+ RelativePath="..\..\boostw\boost_1_34_1\boost\version.hpp"
+ >
+ </File>
+ </Filter>
+ <Filter
+ Name="client"
+ >
+ <File
+ RelativePath="..\client\connpool.h"
+ >
+ </File>
+ <File
+ RelativePath="..\client\dbclient.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\client\dbclient.h"
+ >
+ </File>
+ <File
+ RelativePath="..\client\model.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\client\model.h"
+ >
+ </File>
+ </Filter>
+ </Files>
+ <Globals>
+ </Globals>
+</VisualStudioProject>
diff --git a/s/dbgrid.vcxproj b/s/dbgrid.vcxproj
new file mode 100644
index 0000000..e997055
--- /dev/null
+++ b/s/dbgrid.vcxproj
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug Recstore|Win32">
+ <Configuration>Debug Recstore</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectName>mongos</ProjectName>
+ <ProjectGuid>{E03717ED-69B4-4D21-BC55-DF6690B585C6}</ProjectGuid>
+ <RootNamespace>dbgrid</RootNamespace>
+ <Keyword>Win32Proj</Keyword>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <CharacterSet>Unicode</CharacterSet>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'" Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" />
+ </ImportGroup>
+ <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" />
+ </ImportGroup>
+ <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup>
+ <_ProjectFileVersion>10.0.21006.1</_ProjectFileVersion>
+ <OutDir Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">$(SolutionDir)$(Configuration)\</OutDir>
+ <IntDir Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">$(Configuration)\</IntDir>
+ <LinkIncremental Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</LinkIncremental>
+ <OutDir Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">$(SolutionDir)$(Configuration)\</OutDir>
+ <IntDir Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">$(Configuration)\</IntDir>
+ <LinkIncremental Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">false</LinkIncremental>
+ <OutDir Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">$(SolutionDir)$(Configuration)\</OutDir>
+ <IntDir Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">$(Configuration)\</IntDir>
+ <LinkIncremental Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">true</LinkIncremental>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <Optimization>Disabled</Optimization>
+ <AdditionalIncludeDirectories>..\pcre-7.4;C:\Program Files\boost\boost_1_41_0;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
+ <PreprocessorDefinitions>USE_ASIO;WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H;PCRE_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MinimalRebuild>true</MinimalRebuild>
+ <BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks>
+ <RuntimeLibrary>MultiThreadedDebugDLL</RuntimeLibrary>
+ <PrecompiledHeader>Use</PrecompiledHeader>
+ <PrecompiledHeaderFile>stdafx.h</PrecompiledHeaderFile>
+ <WarningLevel>Level3</WarningLevel>
+ <DebugInformationFormat>EditAndContinue</DebugInformationFormat>
+ <DisableSpecificWarnings>4355;4800;%(DisableSpecificWarnings)</DisableSpecificWarnings>
+ </ClCompile>
+ <Link>
+ <AdditionalDependencies>ws2_32.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalLibraryDirectories>c:\program files\boost\boost_1_41_0\lib;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <SubSystem>Console</SubSystem>
+ <TargetMachine>MachineX86</TargetMachine>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <Optimization>MaxSpeed</Optimization>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <PreprocessorDefinitions>USE_ASIO;WIN32;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;PCRE_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <RuntimeLibrary>MultiThreadedDLL</RuntimeLibrary>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <PrecompiledHeader>Use</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <DebugInformationFormat>ProgramDatabase</DebugInformationFormat>
+ <DisableSpecificWarnings>4355;4800;%(DisableSpecificWarnings)</DisableSpecificWarnings>
+ </ClCompile>
+ <Link>
+ <AdditionalDependencies>ws2_32.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalLibraryDirectories>c:\Program Files\boost\boost_1_41_0\lib;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <SubSystem>Console</SubSystem>
+ <OptimizeReferences>true</OptimizeReferences>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <TargetMachine>MachineX86</TargetMachine>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">
+ <ClCompile>
+ <Optimization>Disabled</Optimization>
+ <AdditionalIncludeDirectories>..\pcre-7.4;C:\Program Files\boost\boost_1_41_0;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
+ <PreprocessorDefinitions>USE_ASIO;WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H;PCRE_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MinimalRebuild>true</MinimalRebuild>
+ <BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks>
+ <RuntimeLibrary>MultiThreadedDebugDLL</RuntimeLibrary>
+ <PrecompiledHeader>Use</PrecompiledHeader>
+ <PrecompiledHeaderFile>stdafx.h</PrecompiledHeaderFile>
+ <WarningLevel>Level3</WarningLevel>
+ <DebugInformationFormat>EditAndContinue</DebugInformationFormat>
+ <DisableSpecificWarnings>4355;4800;%(DisableSpecificWarnings)</DisableSpecificWarnings>
+ </ClCompile>
+ <Link>
+ <AdditionalDependencies>ws2_32.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalLibraryDirectories>c:\program files\boost\boost_1_41_0\lib;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <SubSystem>Console</SubSystem>
+ <TargetMachine>MachineX86</TargetMachine>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemGroup>
+ <ClCompile Include="chunk.cpp" />
+ <ClCompile Include="commands_admin.cpp" />
+ <ClCompile Include="commands_public.cpp" />
+ <ClCompile Include="config.cpp" />
+ <ClCompile Include="cursors.cpp" />
+ <ClCompile Include="..\db\queryutil.cpp" />
+ <ClCompile Include="request.cpp" />
+ <ClCompile Include="server.cpp" />
+ <ClCompile Include="shardkey.cpp" />
+ <ClCompile Include="..\stdafx.cpp">
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">Create</PrecompiledHeader>
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader>
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">Create</PrecompiledHeader>
+ </ClCompile>
+ <ClCompile Include="strategy.cpp" />
+ <ClCompile Include="strategy_shard.cpp" />
+ <ClCompile Include="strategy_single.cpp" />
+ <ClCompile Include="..\util\assert_util.cpp" />
+ <ClCompile Include="..\util\background.cpp" />
+ <ClCompile Include="..\util\base64.cpp" />
+ <ClCompile Include="..\db\commands.cpp" />
+ <ClCompile Include="..\util\debug_util.cpp" />
+ <ClCompile Include="..\db\jsobj.cpp" />
+ <ClCompile Include="..\db\json.cpp" />
+ <ClCompile Include="..\db\lasterror.cpp" />
+ <ClCompile Include="..\util\md5.c">
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">
+ </PrecompiledHeader>
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ </PrecompiledHeader>
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ </PrecompiledHeader>
+ </ClCompile>
+ <ClCompile Include="..\util\md5main.cpp">
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Use</PrecompiledHeader>
+ </ClCompile>
+ <ClCompile Include="..\util\message.cpp" />
+ <ClCompile Include="..\util\message_server_asio.cpp">
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">
+ </PrecompiledHeader>
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ </PrecompiledHeader>
+ <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ </PrecompiledHeader>
+ </ClCompile>
+ <ClCompile Include="..\db\nonce.cpp" />
+ <ClCompile Include="..\client\parallel.cpp" />
+ <ClCompile Include="..\util\sock.cpp" />
+ <ClCompile Include="..\util\util.cpp" />
+ <ClCompile Include="..\client\connpool.cpp" />
+ <ClCompile Include="..\client\dbclient.cpp" />
+ <ClCompile Include="..\client\model.cpp" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="gridconfig.h" />
+ <ClInclude Include="griddatabase.h" />
+ <ClInclude Include="shard.h" />
+ <ClInclude Include="strategy.h" />
+ <ClInclude Include="..\util\background.h" />
+ <ClInclude Include="..\db\commands.h" />
+ <ClInclude Include="..\db\dbmessage.h" />
+ <ClInclude Include="..\util\goodies.h" />
+ <ClInclude Include="..\db\jsobj.h" />
+ <ClInclude Include="..\db\json.h" />
+ <ClInclude Include="..\stdafx.h" />
+ <ClInclude Include="..\client\connpool.h" />
+ <ClInclude Include="..\client\dbclient.h" />
+ <ClInclude Include="..\client\model.h" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ </ImportGroup>
+</Project>
diff --git a/s/request.cpp b/s/request.cpp
new file mode 100644
index 0000000..8bebd64
--- /dev/null
+++ b/s/request.cpp
@@ -0,0 +1,175 @@
+/* dbgrid/request.cpp
+
+ Top level handling of requests (operations such as query, insert, ...)
+*/
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "stdafx.h"
+#include "server.h"
+#include "../db/commands.h"
+#include "../db/dbmessage.h"
+#include "../client/connpool.h"
+
+#include "request.h"
+#include "config.h"
+#include "chunk.h"
+
+namespace mongo {
+
+ Request::Request( Message& m, AbstractMessagingPort* p ) :
+ _m(m) , _d( m ) , _p(p){
+
+ assert( _d.getns() );
+ _id = _m.data->id;
+
+ _clientId = p ? p->remotePort() << 16 : 0;
+ _clientInfo = ClientInfo::get( _clientId );
+ _clientInfo->newRequest();
+
+ reset();
+ }
+
+ void Request::reset( bool reload ){
+ _config = grid.getDBConfig( getns() );
+ if ( reload )
+ uassert( 10192 , "db config reload failed!" , _config->reload() );
+
+ if ( _config->isSharded( getns() ) ){
+ _chunkManager = _config->getChunkManager( getns() , reload );
+ uassert( 10193 , (string)"no shard info for: " + getns() , _chunkManager );
+ }
+ else {
+ _chunkManager = 0;
+ }
+
+ _m.data->id = _id;
+
+ }
+
+ string Request::singleServerName(){
+ if ( _chunkManager ){
+ if ( _chunkManager->numChunks() > 1 )
+ throw UserException( 8060 , "can't call singleServerName on a sharded collection" );
+ return _chunkManager->findChunk( _chunkManager->getShardKey().globalMin() ).getShard();
+ }
+ string s = _config->getShard( getns() );
+ uassert( 10194 , "can't call singleServerName on a sharded collection!" , s.size() > 0 );
+ return s;
+ }
+
+ void Request::process( int attempt ){
+
+ log(2) << "Request::process ns: " << getns() << " msg id:" << (int)(_m.data->id) << " attempt: " << attempt << endl;
+
+ int op = _m.data->operation();
+ assert( op > dbMsg );
+
+ Strategy * s = SINGLE;
+
+ _d.markSet();
+
+ if ( _chunkManager ){
+ s = SHARDED;
+ }
+
+ if ( op == dbQuery ) {
+ try {
+ s->queryOp( *this );
+ }
+ catch ( StaleConfigException& staleConfig ){
+ log() << staleConfig.what() << " attempt: " << attempt << endl;
+ uassert( 10195 , "too many attempts to update config, failing" , attempt < 5 );
+
+ sleepsecs( attempt );
+ reset( true );
+ _d.markReset();
+ process( attempt + 1 );
+ return;
+ }
+ }
+ else if ( op == dbGetMore ) {
+ s->getMore( *this );
+ }
+ else {
+ s->writeOp( op, *this );
+ }
+ }
+
+
+ ClientInfo::ClientInfo( int clientId ) : _id( clientId ){
+ _cur = &_a;
+ _prev = &_b;
+ newRequest();
+ }
+
+ ClientInfo::~ClientInfo(){
+ boostlock lk( _clientsLock );
+ ClientCache::iterator i = _clients.find( _id );
+ if ( i != _clients.end() ){
+ _clients.erase( i );
+ }
+ }
+
+ void ClientInfo::addShard( const string& shard ){
+ _cur->insert( shard );
+ }
+
+ void ClientInfo::newRequest(){
+ _lastAccess = (int) time(0);
+
+ set<string> * temp = _cur;
+ _cur = _prev;
+ _prev = temp;
+ _cur->clear();
+ }
+
+ void ClientInfo::disconnect(){
+ _lastAccess = 0;
+ }
+
+ ClientInfo * ClientInfo::get( int clientId , bool create ){
+
+ if ( ! clientId )
+ clientId = getClientId();
+
+ if ( ! clientId ){
+ ClientInfo * info = _tlInfo.get();
+ if ( ! info ){
+ info = new ClientInfo( 0 );
+ _tlInfo.reset( info );
+ }
+ info->newRequest();
+ return info;
+ }
+
+ boostlock lk( _clientsLock );
+ ClientCache::iterator i = _clients.find( clientId );
+ if ( i != _clients.end() )
+ return i->second;
+ if ( ! create )
+ return 0;
+ ClientInfo * info = new ClientInfo( clientId );
+ _clients[clientId] = info;
+ return info;
+ }
+
+ map<int,ClientInfo*> ClientInfo::_clients;
+ boost::mutex ClientInfo::_clientsLock;
+ boost::thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo;
+
+} // namespace mongo
diff --git a/s/request.h b/s/request.h
new file mode 100644
index 0000000..689216c
--- /dev/null
+++ b/s/request.h
@@ -0,0 +1,120 @@
+// request.h
+
+#pragma once
+
+#include "../stdafx.h"
+#include "../util/message.h"
+#include "../db/dbmessage.h"
+#include "config.h"
+#include "util.h"
+
+namespace mongo {
+
+ class ClientInfo;
+
+ class Request : boost::noncopyable {
+ public:
+ Request( Message& m, AbstractMessagingPort* p );
+
+ // ---- message info -----
+
+
+ const char * getns(){
+ return _d.getns();
+ }
+ int op(){
+ return _m.data->operation();
+ }
+ bool expectResponse(){
+ return op() == dbQuery || op() == dbGetMore;
+ }
+
+ MSGID id(){
+ return _id;
+ }
+
+ DBConfig * getConfig(){
+ return _config;
+ }
+ bool isShardingEnabled(){
+ return _config->isShardingEnabled();
+ }
+
+ ChunkManager * getChunkManager(){
+ return _chunkManager;
+ }
+
+ int getClientId(){
+ return _clientId;
+ }
+ ClientInfo * getClientInfo(){
+ return _clientInfo;
+ }
+
+ // ---- remote location info -----
+
+
+ string singleServerName();
+
+ const char * primaryName(){
+ return _config->getPrimary().c_str();
+ }
+
+ // ---- low level access ----
+
+ void reply( Message & response ){
+ _p->reply( _m , response , _id );
+ }
+
+ Message& m(){ return _m; }
+ DbMessage& d(){ return _d; }
+ AbstractMessagingPort* p(){ return _p; }
+
+ void process( int attempt = 0 );
+
+ private:
+
+ void reset( bool reload=false );
+
+ Message& _m;
+ DbMessage _d;
+ AbstractMessagingPort* _p;
+
+ MSGID _id;
+ DBConfig * _config;
+ ChunkManager * _chunkManager;
+
+ int _clientId;
+ ClientInfo * _clientInfo;
+ };
+
+ typedef map<int,ClientInfo*> ClientCache;
+
+ class ClientInfo {
+ public:
+ ClientInfo( int clientId );
+ ~ClientInfo();
+
+ void addShard( const string& shard );
+ set<string> * getPrev() const { return _prev; };
+
+ void newRequest();
+ void disconnect();
+
+ static ClientInfo * get( int clientId = 0 , bool create = true );
+
+ private:
+ int _id;
+ set<string> _a;
+ set<string> _b;
+ set<string> * _cur;
+ set<string> * _prev;
+ int _lastAccess;
+
+ static boost::mutex _clientsLock;
+ static ClientCache _clients;
+ static boost::thread_specific_ptr<ClientInfo> _tlInfo;
+ };
+}
+
+#include "strategy.h"
diff --git a/s/s_only.cpp b/s/s_only.cpp
new file mode 100644
index 0000000..d692ff2
--- /dev/null
+++ b/s/s_only.cpp
@@ -0,0 +1,29 @@
+// s_only.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../stdafx.h"
+#include "../client/dbclient.h"
+#include "../db/dbhelpers.h"
+
+namespace mongo {
+
+ auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){
+ uassert( 10196 , "Helpers::find can't be used in mongos" , 0 );
+ auto_ptr<CursorIterator> i;
+ return i;
+ }
+}
diff --git a/s/server.cpp b/s/server.cpp
new file mode 100644
index 0000000..4868caf
--- /dev/null
+++ b/s/server.cpp
@@ -0,0 +1,202 @@
+// server.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "stdafx.h"
+#include "../util/message.h"
+#include "../util/unittest.h"
+#include "../client/connpool.h"
+#include "../util/message_server.h"
+
+#include "server.h"
+#include "request.h"
+#include "config.h"
+#include "chunk.h"
+
+namespace mongo {
+
+ Database *database = 0;
+ string ourHostname;
+ OID serverID;
+ bool dbexitCalled = false;
+ CmdLine cmdLine;
+
+ bool inShutdown(){
+ return dbexitCalled;
+ }
+
+ string getDbContext() {
+ return "?";
+ }
+
+ bool haveLocalShardingInfo( const string& ns ){
+ assert( 0 );
+ return false;
+ }
+
+ void usage( char * argv[] ){
+ out() << argv[0] << " usage:\n\n";
+ out() << " -v+ verbose\n";
+ out() << " --port <portno>\n";
+ out() << " --configdb <configdbname> [<configdbname>...]\n";
+ out() << endl;
+ }
+
+ class ShardingConnectionHook : public DBConnectionHook {
+ public:
+ virtual void onCreate( DBClientBase * conn ){
+ conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" );
+ }
+ virtual void onHandedOut( DBClientBase * conn ){
+ ClientInfo::get()->addShard( conn->getServerAddress() );
+ }
+ } shardingConnectionHook;
+
+ class ShardedMessageHandler : public MessageHandler {
+ public:
+ virtual ~ShardedMessageHandler(){}
+ virtual void process( Message& m , AbstractMessagingPort* p ){
+ Request r( m , p );
+ if ( logLevel > 5 ){
+ log(5) << "client id: " << hex << r.getClientId() << "\t" << r.getns() << "\t" << dec << r.op() << endl;
+ }
+ try {
+ setClientId( r.getClientId() );
+ r.process();
+ }
+ catch ( DBException& e ){
+ m.data->id = r.id();
+ log() << "UserException: " << e.what() << endl;
+ if ( r.expectResponse() ){
+ BSONObj err = BSON( "$err" << e.what() );
+ replyToQuery( QueryResult::ResultFlag_ErrSet, p , m , err );
+ }
+ }
+ }
+ };
+
+ void init(){
+ serverID.init();
+ setupSIGTRAPforGDB();
+ }
+
+ void start() {
+ log() << "waiting for connections on port " << cmdLine.port << endl;
+ //DbGridListener l(port);
+ //l.listen();
+ ShardedMessageHandler handler;
+ MessageServer * server = createServer( cmdLine.port , &handler );
+ server->run();
+ }
+
+ DBClientBase *createDirectClient(){
+ uassert( 10197 , "createDirectClient not implemented for sharding yet" , 0 );
+ return 0;
+ }
+
+} // namespace mongo
+
+using namespace mongo;
+
+int main(int argc, char* argv[], char *envp[] ) {
+
+ bool justTests = false;
+ vector<string> configdbs;
+
+ for (int i = 1; i < argc; i++) {
+ if ( argv[i] == 0 ) continue;
+ string s = argv[i];
+ if ( s == "--port" ) {
+ cmdLine.port = atoi(argv[++i]);
+ }
+ else if ( s == "--configdb" ) {
+
+ while ( ++i < argc )
+ configdbs.push_back(argv[i]);
+
+ if ( configdbs.size() == 0 ) {
+ out() << "error: no args for --configdb\n";
+ return 4;
+ }
+
+ if ( configdbs.size() > 2 ) {
+ out() << "error: --configdb does not support more than 2 parameters yet\n";
+ return 5;
+ }
+ }
+ else if ( s.find( "-v" ) == 0 ){
+ logLevel = s.size() - 1;
+ }
+ else if ( s == "--test" ) {
+ justTests = true;
+ logLevel = 5;
+ }
+ else {
+ usage( argv );
+ return 3;
+ }
+ }
+
+ if ( justTests ){
+ UnitTest::runTests();
+ cout << "tests passed" << endl;
+ return 0;
+ }
+
+ pool.addHook( &shardingConnectionHook );
+
+ if ( argc <= 1 ) {
+ usage( argv );
+ return 3;
+ }
+
+ bool ok = cmdLine.port != 0 && configdbs.size();
+
+ if ( !ok ) {
+ usage( argv );
+ return 1;
+ }
+
+ log() << argv[0] << " v0.3- (alpha 3t) starting (--help for usage)" << endl;
+ printGitVersion();
+ printSysInfo();
+
+ if ( ! configServer.init( configdbs ) ){
+ cout << "couldn't connectd to config db" << endl;
+ return 7;
+ }
+
+ assert( configServer.ok() );
+
+ int configError = configServer.checkConfigVersion();
+ if ( configError ){
+ cout << "config server error: " << configError << endl;
+ return configError;
+ }
+
+ init();
+ start();
+ dbexit( EXIT_CLEAN );
+ return 0;
+}
+
+#undef exit
+void mongo::dbexit( ExitCode rc, const char *why) {
+ dbexitCalled = true;
+ log() << "dbexit: " << why << " rc:" << rc << endl;
+ ::exit(rc);
+}
diff --git a/s/server.h b/s/server.h
new file mode 100644
index 0000000..067afeb
--- /dev/null
+++ b/s/server.h
@@ -0,0 +1,30 @@
+// server.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string>
+#include "../util/message.h"
+#include "../db/jsobj.h"
+
+namespace mongo {
+
+ extern std::string ourHostname;
+ extern OID serverID;
+
+ // from request.cpp
+ void processRequest(Message& m, MessagingPort& p);
+}
diff --git a/s/shardkey.cpp b/s/shardkey.cpp
new file mode 100644
index 0000000..15cf7b9
--- /dev/null
+++ b/s/shardkey.cpp
@@ -0,0 +1,320 @@
+// shardkey.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "stdafx.h"
+#include "chunk.h"
+#include "../db/jsobj.h"
+#include "../util/unittest.h"
+
+/**
+ TODO: this only works with numbers right now
+ this is very temporary, need to make work with anything
+*/
+
+namespace mongo {
+ void minForPat(BSONObjBuilder& out, const BSONObj& pat){
+ BSONElement e = pat.firstElement();
+ if (e.type() == Object){
+ BSONObjBuilder sub;
+ minForPat(sub, e.embeddedObject());
+ out.append(e.fieldName(), sub.obj());
+ } else {
+ out.appendMinKey(e.fieldName());
+ }
+ }
+
+ void maxForPat(BSONObjBuilder& out, const BSONObj& pat){
+ BSONElement e = pat.firstElement();
+ if (e.type() == Object){
+ BSONObjBuilder sub;
+ maxForPat(sub, e.embeddedObject());
+ out.append(e.fieldName(), sub.obj());
+ } else {
+ out.appendMaxKey(e.fieldName());
+ }
+ }
+
+ ShardKeyPattern::ShardKeyPattern( BSONObj p ) : pattern( p.getOwned() ) {
+ pattern.getFieldNames(patternfields);
+
+ BSONObjBuilder min;
+ minForPat(min, pattern);
+ gMin = min.obj();
+
+ BSONObjBuilder max;
+ maxForPat(max, pattern);
+ gMax = max.obj();
+ }
+
+ int ShardKeyPattern::compare( const BSONObj& lObject , const BSONObj& rObject ) {
+ BSONObj L = extractKey(lObject);
+ uassert( 10198 , "left object doesn't have shard key", !L.isEmpty());
+ BSONObj R = extractKey(rObject);
+ uassert( 10199 , "right object doesn't have shard key", !R.isEmpty());
+ return L.woCompare(R);
+ }
+
+ bool ShardKeyPattern::hasShardKey( const BSONObj& obj ) {
+ /* this is written s.t. if obj has lots of fields, if the shard key fields are early,
+ it is fast. so a bit more work to try to be semi-fast.
+ */
+
+ for(set<string>::iterator it = patternfields.begin(); it != patternfields.end(); ++it){
+ if(obj.getFieldDotted(it->c_str()).eoo())
+ return false;
+ }
+ return true;
+ }
+
+ /** @return true if shard s is relevant for query q.
+
+ Example:
+ q: { x : 3 }
+ *this: { x : 1 }
+ s: x:2..x:7
+ -> true
+ */
+
+ bool ShardKeyPattern::relevant(const BSONObj& query, const BSONObj& L, const BSONObj& R) {
+ BSONObj q = extractKey( query );
+ if( q.isEmpty() )
+ return true;
+
+ BSONElement e = q.firstElement();
+ assert( !e.eoo() ) ;
+
+ if( e.type() == RegEx ) {
+ /* todo: if starts with ^, we could be smarter here */
+ return true;
+ }
+
+ if( e.type() == Object ) {
+ BSONObjIterator j(e.embeddedObject());
+ BSONElement LE = L.firstElement(); // todo compound keys
+ BSONElement RE = R.firstElement(); // todo compound keys
+ while( 1 ) {
+ BSONElement f = j.next();
+ if( f.eoo() )
+ break;
+ int op = f.getGtLtOp();
+ switch( op ) {
+ case BSONObj::LT:
+ if( f.woCompare(LE, false) <= 0 )
+ return false;
+ break;
+ case BSONObj::LTE:
+ if( f.woCompare(LE, false) < 0 )
+ return false;
+ break;
+ case BSONObj::GT:
+ case BSONObj::GTE:
+ if( f.woCompare(RE, false) >= 0 )
+ return false;
+ break;
+ case BSONObj::opIN:
+ case BSONObj::NE:
+ case BSONObj::opSIZE:
+ massert( 10423 , "not implemented yet relevant()", false);
+ case BSONObj::Equality:
+ goto normal;
+ default:
+ massert( 10424 , "bad operator in relevant()?", false);
+ }
+ }
+ return true;
+ }
+normal:
+ return L.woCompare(q) <= 0 && R.woCompare(q) > 0;
+ }
+
+ bool ShardKeyPattern::relevantForQuery( const BSONObj& query , Chunk * chunk ){
+ massert( 10425 , "not done for compound patterns", patternfields.size() == 1);
+
+ bool rel = relevant(query, chunk->getMin(), chunk->getMax());
+ if( ! hasShardKey( query ) )
+ assert(rel);
+
+ return rel;
+ }
+
+ /**
+ returns a query that filters results only for the range desired, i.e. returns
+ { $gte : keyval(min), $lt : keyval(max) }
+ */
+ void ShardKeyPattern::getFilter( BSONObjBuilder& b , const BSONObj& min, const BSONObj& max ){
+ massert( 10426 , "not done for compound patterns", patternfields.size() == 1);
+ BSONObjBuilder temp;
+ temp.appendAs( extractKey(min).firstElement(), "$gte" );
+ temp.appendAs( extractKey(max).firstElement(), "$lt" );
+
+ b.append( patternfields.begin()->c_str(), temp.obj() );
+ }
+
+ /**
+ Example
+ sort: { ts: -1 }
+ *this: { ts:1 }
+ -> -1
+
+ @return
+ 0 if sort either doesn't have all the fields or has extra fields
+ < 0 if sort is descending
+ > 1 if sort is ascending
+ */
+ int ShardKeyPattern::canOrder( const BSONObj& sort ){
+ // e.g.:
+ // sort { a : 1 , b : -1 }
+ // pattern { a : -1, b : 1, c : 1 }
+ // -> -1
+
+ int dir = 0;
+
+ BSONObjIterator s(sort);
+ BSONObjIterator p(pattern);
+ while( 1 ) {
+ BSONElement e = s.next();
+ if( e.eoo() )
+ break;
+ if( !p.moreWithEOO() )
+ return 0;
+ BSONElement ep = p.next();
+ bool same = e == ep;
+ if( !same ) {
+ if( strcmp(e.fieldName(), ep.fieldName()) != 0 )
+ return 0;
+ // same name, but opposite direction
+ if( dir == -1 )
+ ; // ok
+ else if( dir == 1 )
+ return 0; // wrong direction for a 2nd field
+ else // dir == 0, initial pass
+ dir = -1;
+ }
+ else {
+ // fields are the same
+ if( dir == -1 )
+ return 0; // wrong direction
+ dir = 1;
+ }
+ }
+
+ return dir;
+ }
+
+ string ShardKeyPattern::toString() const {
+ return pattern.toString();
+ }
+
+ /* things to test for compound :
+ x hasshardkey
+ _ getFilter (hard?)
+ _ relevantForQuery
+ x canOrder
+ \ middle (deprecating?)
+ */
+ class ShardKeyUnitTest : public UnitTest {
+ public:
+ void hasshardkeytest() {
+ BSONObj x = fromjson("{ zid : \"abcdefg\", num: 1.0, name: \"eliot\" }");
+ ShardKeyPattern k( BSON( "num" << 1 ) );
+ assert( k.hasShardKey(x) );
+ assert( !k.hasShardKey( fromjson("{foo:'a'}") ) );
+
+ // try compound key
+ {
+ ShardKeyPattern k( fromjson("{a:1,b:-1,c:1}") );
+ assert( k.hasShardKey( fromjson("{foo:'a',a:'b',c:'z',b:9,k:99}") ) );
+ assert( !k.hasShardKey( fromjson("{foo:'a',a:'b',c:'z',bb:9,k:99}") ) );
+ assert( !k.hasShardKey( fromjson("{k:99}") ) );
+ }
+
+ }
+ void rfq() {
+ ShardKeyPattern k( BSON( "key" << 1 ) );
+ BSONObj q = BSON( "key" << 3 );
+ Chunk c(0);
+ BSONObj z = fromjson("{ ns : \"alleyinsider.fs.chunks\" , min : {key:2} , max : {key:20} , server : \"localhost:30001\" }");
+ c.unserialize(z);
+ assert( k.relevantForQuery(q, &c) );
+ assert( k.relevantForQuery(fromjson("{foo:9,key:4}"), &c) );
+ assert( !k.relevantForQuery(fromjson("{foo:9,key:43}"), &c) );
+ assert( k.relevantForQuery(fromjson("{foo:9,key:{$gt:10}}"), &c) );
+ assert( !k.relevantForQuery(fromjson("{foo:9,key:{$gt:22}}"), &c) );
+ assert( k.relevantForQuery(fromjson("{foo:9}"), &c) );
+ }
+ void getfilt() {
+ ShardKeyPattern k( BSON( "key" << 1 ) );
+ BSONObjBuilder b;
+ k.getFilter(b, fromjson("{z:3,key:30}"), fromjson("{key:90}"));
+ BSONObj x = fromjson("{ key: { $gte: 30, $lt: 90 } }");
+ assert( x.woEqual(b.obj()) );
+ }
+ void testCanOrder() {
+ ShardKeyPattern k( fromjson("{a:1,b:-1,c:1}") );
+ assert( k.canOrder( fromjson("{a:1}") ) == 1 );
+ assert( k.canOrder( fromjson("{a:-1}") ) == -1 );
+ assert( k.canOrder( fromjson("{a:1,b:-1,c:1}") ) == 1 );
+ assert( k.canOrder( fromjson("{a:1,b:1}") ) == 0 );
+ assert( k.canOrder( fromjson("{a:-1,b:1}") ) == -1 );
+ }
+ void extractkeytest() {
+ ShardKeyPattern k( fromjson("{a:1,b:-1,c:1}") );
+
+ BSONObj x = fromjson("{a:1,b:2,c:3}");
+ assert( k.extractKey( fromjson("{a:1,b:2,c:3}") ).woEqual(x) );
+ assert( k.extractKey( fromjson("{b:2,c:3,a:1}") ).woEqual(x) );
+ }
+ void run(){
+ extractkeytest();
+
+ ShardKeyPattern k( BSON( "key" << 1 ) );
+
+ BSONObj min = k.globalMin();
+
+// cout << min.jsonString(TenGen) << endl;
+
+ BSONObj max = k.globalMax();
+
+ BSONObj k1 = BSON( "key" << 5 );
+
+ assert( k.compare( min , max ) < 0 );
+ assert( k.compare( min , k1 ) < 0 );
+ assert( k.compare( max , min ) > 0 );
+ assert( k.compare( min , min ) == 0 );
+
+ hasshardkeytest();
+ assert( k.hasShardKey( k1 ) );
+ assert( ! k.hasShardKey( BSON( "key2" << 1 ) ) );
+
+ BSONObj a = k1;
+ BSONObj b = BSON( "key" << 999 );
+
+ assert( k.compare(a,b) < 0 );
+
+ assert( k.canOrder( fromjson("{key:1}") ) == 1 );
+ assert( k.canOrder( fromjson("{zz:1}") ) == 0 );
+ assert( k.canOrder( fromjson("{key:-1}") ) == -1 );
+
+ testCanOrder();
+ getfilt();
+ rfq();
+ // add middle multitype tests
+ }
+ } shardKeyTest;
+
+} // namespace mongo
diff --git a/s/shardkey.h b/s/shardkey.h
new file mode 100644
index 0000000..0c357f6
--- /dev/null
+++ b/s/shardkey.h
@@ -0,0 +1,128 @@
+// shardkey.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../client/dbclient.h"
+
+namespace mongo {
+
+ class Chunk;
+
+ /* A ShardKeyPattern is a pattern indicating what data to extract from the object to make the shard key from.
+ Analogous to an index key pattern.
+ */
+ class ShardKeyPattern {
+ public:
+ ShardKeyPattern( BSONObj p = BSONObj() );
+
+ /**
+ global min is the lowest possible value for this key
+ e.g. { num : MinKey }
+ */
+ BSONObj globalMin() const { return gMin; }
+
+ /**
+ global max is the highest possible value for this key
+ */
+ BSONObj globalMax() const { return gMax; }
+
+ bool isGlobalMin( const BSONObj& k ){
+ return k.woCompare( globalMin() ) == 0;
+ }
+
+ bool isGlobalMax( const BSONObj& k ){
+ return k.woCompare( globalMax() ) == 0;
+ }
+
+ bool isGlobal( const BSONObj& k ){
+ return isGlobalMin( k ) || isGlobalMax( k );
+ }
+
+ /** compare shard keys from the objects specified
+ l < r negative
+ l == r 0
+ l > r positive
+ */
+ int compare( const BSONObj& l , const BSONObj& r );
+
+ /**
+ @return whether or not obj has all fields in this shard key pattern
+ e.g.
+ ShardKey({num:1}).hasShardKey({ name:"joe", num:3 }) is true
+ */
+ bool hasShardKey( const BSONObj& obj );
+
+ /**
+ returns a query that filters results only for the range desired, i.e. returns
+ { "field" : { $gte: keyval(min), $lt: keyval(max) } }
+ */
+ void getFilter( BSONObjBuilder& b , const BSONObj& min, const BSONObj& max );
+
+ /** @return true if shard s is relevant for query q.
+
+ Example:
+ q: { x : 3 }
+ *this: { x : 1 }
+ s: x:2..x:7
+ -> true
+ */
+ bool relevantForQuery( const BSONObj& q , Chunk * s );
+
+ /**
+ Returns if the given sort pattern can be ordered by the shard key pattern.
+ Example
+ sort: { ts: -1 }
+ *this: { ts:1 }
+ -> -1
+
+ @return
+ 0 if sort either doesn't have all the fields or has extra fields
+ < 0 if sort is descending
+ > 1 if sort is ascending
+ */
+ int canOrder( const BSONObj& sort );
+
+ BSONObj key() { return pattern; }
+
+ string toString() const;
+
+ BSONObj extractKey(const BSONObj& from) const;
+
+ bool partOfShardKey(const string& key ) const {
+ return patternfields.count( key ) > 0;
+ }
+
+ operator string() const {
+ return pattern.toString();
+ }
+ private:
+ BSONObj pattern;
+ BSONObj gMin;
+ BSONObj gMax;
+
+ /* question: better to have patternfields precomputed or not? depends on if we use copy contructor often. */
+ set<string> patternfields;
+ bool relevant(const BSONObj& query, const BSONObj& L, const BSONObj& R);
+ };
+
+ inline BSONObj ShardKeyPattern::extractKey(const BSONObj& from) const {
+ return from.extractFields(pattern);
+ }
+
+}
diff --git a/s/strategy.cpp b/s/strategy.cpp
new file mode 100644
index 0000000..b485bd2
--- /dev/null
+++ b/s/strategy.cpp
@@ -0,0 +1,221 @@
+// stragegy.cpp
+
+#include "stdafx.h"
+#include "request.h"
+#include "../util/background.h"
+#include "../client/connpool.h"
+#include "../db/commands.h"
+#include "server.h"
+
+namespace mongo {
+
+ // ----- Strategy ------
+
+ void Strategy::doWrite( int op , Request& r , string server ){
+ ScopedDbConnection dbcon( server );
+ DBClientBase &_c = dbcon.conn();
+
+ /* TODO FIX - do not case and call DBClientBase::say() */
+ DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
+ c.port().say( r.m() );
+
+ dbcon.done();
+ }
+
+ void Strategy::doQuery( Request& r , string server ){
+ try{
+ ScopedDbConnection dbcon( server );
+ DBClientBase &_c = dbcon.conn();
+
+ checkShardVersion( _c , r.getns() );
+
+ // TODO: This will not work with Paired connections. Fix.
+ DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
+ Message response;
+ bool ok = c.port().call( r.m(), response);
+
+ {
+ QueryResult *qr = (QueryResult *) response.data;
+ if ( qr->resultFlags() & QueryResult::ResultFlag_ShardConfigStale ){
+ dbcon.done();
+ throw StaleConfigException( r.getns() , "Strategy::doQuery" );
+ }
+ }
+
+ uassert( 10200 , "mongos: error calling db", ok);
+ r.reply( response );
+ dbcon.done();
+ }
+ catch ( AssertionException& e ) {
+ BSONObjBuilder err;
+ err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg));
+ BSONObj errObj = err.done();
+ replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj);
+ }
+ }
+
+ void Strategy::insert( string server , const char * ns , const BSONObj& obj ){
+ ScopedDbConnection dbcon( server );
+ checkShardVersion( dbcon.conn() , ns );
+ dbcon->insert( ns , obj );
+ dbcon.done();
+ }
+
+ map<DBClientBase*,unsigned long long> checkShardVersionLastSequence;
+
+ class WriteBackListener : public BackgroundJob {
+ protected:
+
+ WriteBackListener( const string& addr ) : _addr( addr ){
+ cout << "creating WriteBackListener for: " << addr << endl;
+ }
+
+ void run(){
+ int secsToSleep = 0;
+ while ( 1 ){
+ try {
+ ScopedDbConnection conn( _addr );
+
+ BSONObj result;
+
+ {
+ BSONObjBuilder cmd;
+ cmd.appendOID( "writebacklisten" , &serverID );
+ if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){
+ log() << "writebacklisten command failed! " << result << endl;
+ conn.done();
+ continue;
+ }
+
+ }
+
+ log(1) << "writebacklisten result: " << result << endl;
+
+ BSONObj data = result.getObjectField( "data" );
+ if ( data.getBoolField( "writeBack" ) ){
+ string ns = data["ns"].valuestrsafe();
+
+ int len;
+
+ Message m( (void*)data["msg"].binData( len ) , false );
+ massert( 10427 , "invalid writeback message" , m.data->valid() );
+
+ grid.getDBConfig( ns )->getChunkManager( ns , true );
+
+ Request r( m , 0 );
+ r.process();
+ }
+ else {
+ log() << "unknown writeBack result: " << result << endl;
+ }
+
+ conn.done();
+ secsToSleep = 0;
+ }
+ catch ( std::exception e ){
+ log() << "WriteBackListener exception : " << e.what() << endl;
+ }
+ catch ( ... ){
+ log() << "WriteBackListener uncaught exception!" << endl;
+ }
+ secsToSleep++;
+ sleepsecs(secsToSleep);
+ if ( secsToSleep > 10 )
+ secsToSleep = 0;
+ }
+ }
+
+ private:
+ string _addr;
+ static map<string,WriteBackListener*> _cache;
+
+ public:
+ static void init( DBClientBase& conn ){
+ WriteBackListener*& l = _cache[conn.getServerAddress()];
+ if ( l )
+ return;
+ l = new WriteBackListener( conn.getServerAddress() );
+ l->go();
+ }
+
+ };
+
+ map<string,WriteBackListener*> WriteBackListener::_cache;
+
+
+ void checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative ){
+ // TODO: cache, optimize, etc...
+
+ WriteBackListener::init( conn );
+
+ DBConfig * conf = grid.getDBConfig( ns );
+ if ( ! conf )
+ return;
+
+ ShardChunkVersion version = 0;
+ unsigned long long officialSequenceNumber = 0;
+
+ if ( conf->isSharded( ns ) ){
+ ChunkManager * manager = conf->getChunkManager( ns , authoritative );
+ officialSequenceNumber = manager->getSequenceNumber();
+ version = manager->getVersion( conn.getServerAddress() );
+ }
+
+ unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ];
+ if ( officialSequenceNumber == sequenceNumber )
+ return;
+
+ log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber << endl;
+
+ BSONObj result;
+ if ( setShardVersion( conn , ns , version , authoritative , result ) ){
+ // success!
+ log(1) << " setShardVersion success!" << endl;
+ sequenceNumber = officialSequenceNumber;
+ return;
+ }
+
+ log(1) << " setShardVersion failed!\n" << result << endl;
+
+ if ( result.getBoolField( "need_authoritative" ) )
+ massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative );
+
+ if ( ! authoritative ){
+ checkShardVersion( conn , ns , 1 );
+ return;
+ }
+
+ log(1) << " setShardVersion failed: " << result << endl;
+ massert( 10429 , "setShardVersion failed!" , 0 );
+ }
+
+ bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){
+
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append( "setShardVersion" , ns.c_str() );
+ cmdBuilder.append( "configdb" , configServer.modelServer() );
+ cmdBuilder.appendTimestamp( "version" , version );
+ cmdBuilder.appendOID( "serverID" , &serverID );
+ if ( authoritative )
+ cmdBuilder.appendBool( "authoritative" , 1 );
+ BSONObj cmd = cmdBuilder.obj();
+
+ log(1) << " setShardVersion " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl;
+
+ return conn.runCommand( "admin" , cmd , result );
+ }
+
+ bool lockNamespaceOnServer( const string& server , const string& ns ){
+ ScopedDbConnection conn( server );
+ bool res = lockNamespaceOnServer( conn.conn() , ns );
+ conn.done();
+ return res;
+ }
+
+ bool lockNamespaceOnServer( DBClientBase& conn , const string& ns ){
+ BSONObj lockResult;
+ return setShardVersion( conn , ns , grid.getNextOpTime() , true , lockResult );
+ }
+
+
+}
diff --git a/s/strategy.h b/s/strategy.h
new file mode 100644
index 0000000..e4b93b5
--- /dev/null
+++ b/s/strategy.h
@@ -0,0 +1,36 @@
+// strategy.h
+
+#pragma once
+
+#include "../stdafx.h"
+#include "chunk.h"
+#include "request.h"
+
+namespace mongo {
+
+ class Strategy {
+ public:
+ Strategy(){}
+ virtual ~Strategy() {}
+ virtual void queryOp( Request& r ) = 0;
+ virtual void getMore( Request& r ) = 0;
+ virtual void writeOp( int op , Request& r ) = 0;
+
+ protected:
+ void doWrite( int op , Request& r , string server );
+ void doQuery( Request& r , string server );
+
+ void insert( string server , const char * ns , const BSONObj& obj );
+
+ };
+
+ extern Strategy * SINGLE;
+ extern Strategy * SHARDED;
+
+ bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result );
+
+ bool lockNamespaceOnServer( const string& server , const string& ns );
+ bool lockNamespaceOnServer( DBClientBase& conn , const string& ns );
+
+}
+
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp
new file mode 100644
index 0000000..34cf226
--- /dev/null
+++ b/s/strategy_shard.cpp
@@ -0,0 +1,260 @@
+// strategy_sharded.cpp
+
+#include "stdafx.h"
+#include "request.h"
+#include "chunk.h"
+#include "cursors.h"
+#include "../client/connpool.h"
+#include "../db/commands.h"
+
+// error codes 8010-8040
+
+namespace mongo {
+
+ class ShardStrategy : public Strategy {
+
+ virtual void queryOp( Request& r ){
+ QueryMessage q( r.d() );
+
+ log(3) << "shard query: " << q.ns << " " << q.query << endl;
+
+ if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") )
+ throw UserException( 8010 , "something is wrong, shouldn't see a command here" );
+
+ ChunkManager * info = r.getChunkManager();
+ assert( info );
+
+ Query query( q.query );
+
+ vector<Chunk*> shards;
+ info->getChunksForQuery( shards , query.getFilter() );
+
+ set<ServerAndQuery> servers;
+ map<string,int> serverCounts;
+ for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){
+ servers.insert( ServerAndQuery( (*i)->getShard() , (*i)->getFilter() ) );
+ int& num = serverCounts[(*i)->getShard()];
+ num++;
+ }
+
+ if ( logLevel > 4 ){
+ StringBuilder ss;
+ ss << " shard query servers: " << servers.size() << "\n";
+ for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){
+ const ServerAndQuery& s = *i;
+ ss << " " << s.toString() << "\n";
+ }
+ log() << ss.str();
+ }
+
+ ClusteredCursor * cursor = 0;
+
+ BSONObj sort = query.getSort();
+
+ if ( sort.isEmpty() ){
+ // 1. no sort, can just hit them in serial
+ cursor = new SerialServerClusteredCursor( servers , q );
+ }
+ else {
+ int shardKeyOrder = info->getShardKey().canOrder( sort );
+ if ( shardKeyOrder ){
+ // 2. sort on shard key, can do in serial intelligently
+ set<ServerAndQuery> buckets;
+ for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){
+ Chunk * s = *i;
+ buckets.insert( ServerAndQuery( s->getShard() , s->getFilter() , s->getMin() ) );
+ }
+ cursor = new SerialServerClusteredCursor( buckets , q , shardKeyOrder );
+ }
+ else {
+ // 3. sort on non-sharded key, pull back a portion from each server and iterate slowly
+ cursor = new ParallelSortClusteredCursor( servers , q , sort );
+ }
+ }
+
+ assert( cursor );
+
+ log(5) << " cursor type: " << cursor->type() << endl;
+
+ ShardedClientCursor * cc = new ShardedClientCursor( q , cursor );
+ if ( ! cc->sendNextBatch( r ) ){
+ delete( cursor );
+ return;
+ }
+ log(6) << "storing cursor : " << cc->getId() << endl;
+ cursorCache.store( cc );
+ }
+
+ virtual void getMore( Request& r ){
+ int ntoreturn = r.d().pullInt();
+ long long id = r.d().pullInt64();
+
+ log(6) << "want cursor : " << id << endl;
+
+ ShardedClientCursor * cursor = cursorCache.get( id );
+ if ( ! cursor ){
+ log(6) << "\t invalid cursor :(" << endl;
+ replyToQuery( QueryResult::ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 );
+ return;
+ }
+
+ if ( cursor->sendNextBatch( r , ntoreturn ) ){
+ log(6) << "\t cursor finished: " << id << endl;
+ return;
+ }
+
+ delete( cursor );
+ cursorCache.remove( id );
+ }
+
+ void _insert( Request& r , DbMessage& d, ChunkManager* manager ){
+
+ while ( d.moreJSObjs() ){
+ BSONObj o = d.nextJsObj();
+ if ( ! manager->hasShardKey( o ) ){
+
+ bool bad = true;
+
+ if ( manager->getShardKey().partOfShardKey( "_id" ) ){
+ BSONObjBuilder b;
+ b.appendOID( "_id" , 0 , true );
+ b.appendElements( o );
+ o = b.obj();
+ bad = ! manager->hasShardKey( o );
+ }
+
+ if ( bad ){
+ log() << "tried to insert object without shard key: " << r.getns() << " " << o << endl;
+ throw UserException( 8011 , "tried to insert object without shard key" );
+ }
+
+ }
+
+ Chunk& c = manager->findChunk( o );
+ log(4) << " server:" << c.getShard() << " " << o << endl;
+ insert( c.getShard() , r.getns() , o );
+
+ c.splitIfShould( o.objsize() );
+ }
+ }
+
+ void _update( Request& r , DbMessage& d, ChunkManager* manager ){
+ int flags = d.pullInt();
+
+ BSONObj query = d.nextJsObj();
+ uassert( 10201 , "invalid update" , d.moreJSObjs() );
+ BSONObj toupdate = d.nextJsObj();
+
+ BSONObj chunkFinder = query;
+
+ bool upsert = flags & UpdateOption_Upsert;
+ bool multi = flags & UpdateOption_Multi;
+
+ if ( multi )
+ uassert( 10202 , "can't mix multi and upsert and sharding" , ! upsert );
+
+ if ( upsert && !(manager->hasShardKey(toupdate) ||
+ (toupdate.firstElement().fieldName()[0] == '$' && manager->hasShardKey(query))))
+ {
+ throw UserException( 8012 , "can't upsert something without shard key" );
+ }
+
+ bool save = false;
+ if ( ! manager->hasShardKey( query ) ){
+ if ( multi ){
+ }
+ else if ( query.nFields() != 1 || strcmp( query.firstElement().fieldName() , "_id" ) ){
+ throw UserException( 8013 , "can't do update with query that doesn't have the shard key" );
+ }
+ else {
+ save = true;
+ chunkFinder = toupdate;
+ }
+ }
+
+
+ if ( ! save ){
+ if ( toupdate.firstElement().fieldName()[0] == '$' ){
+ // TODO: check for $set, etc.. on shard key
+ }
+ else if ( manager->hasShardKey( toupdate ) && manager->getShardKey().compare( query , toupdate ) ){
+ throw UserException( 8014 , "change would move shards!" );
+ }
+ }
+
+ if ( multi ){
+ vector<Chunk*> chunks;
+ manager->getChunksForQuery( chunks , chunkFinder );
+ set<string> seen;
+ for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){
+ Chunk * c = *i;
+ if ( seen.count( c->getShard() ) )
+ continue;
+ doWrite( dbUpdate , r , c->getShard() );
+ seen.insert( c->getShard() );
+ }
+ }
+ else {
+ Chunk& c = manager->findChunk( chunkFinder );
+ doWrite( dbUpdate , r , c.getShard() );
+ c.splitIfShould( d.msg().data->dataLen() );
+ }
+
+ }
+
+ void _delete( Request& r , DbMessage& d, ChunkManager* manager ){
+
+ int flags = d.pullInt();
+ bool justOne = flags & 1;
+
+ uassert( 10203 , "bad delete message" , d.moreJSObjs() );
+ BSONObj pattern = d.nextJsObj();
+
+ vector<Chunk*> chunks;
+ manager->getChunksForQuery( chunks , pattern );
+ cout << "delete : " << pattern << " \t " << chunks.size() << " justOne: " << justOne << endl;
+ if ( chunks.size() == 1 ){
+ doWrite( dbDelete , r , chunks[0]->getShard() );
+ return;
+ }
+
+ if ( justOne && ! pattern.hasField( "_id" ) )
+ throw UserException( 8015 , "can only delete with a non-shard key pattern if can delete as many as we find" );
+
+ set<string> seen;
+ for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){
+ Chunk * c = *i;
+ if ( seen.count( c->getShard() ) )
+ continue;
+ seen.insert( c->getShard() );
+ doWrite( dbDelete , r , c->getShard() );
+ }
+ }
+
+ virtual void writeOp( int op , Request& r ){
+ const char *ns = r.getns();
+ log(3) << "write: " << ns << endl;
+
+ DbMessage& d = r.d();
+ ChunkManager * info = r.getChunkManager();
+ assert( info );
+
+ if ( op == dbInsert ){
+ _insert( r , d , info );
+ }
+ else if ( op == dbUpdate ){
+ _update( r , d , info );
+ }
+ else if ( op == dbDelete ){
+ _delete( r , d , info );
+ }
+ else {
+ log() << "sharding can't do write op: " << op << endl;
+ throw UserException( 8016 , "can't do this write op on sharded collection" );
+ }
+
+ }
+ };
+
+ Strategy * SHARDED = new ShardStrategy();
+}
diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp
new file mode 100644
index 0000000..9cf8a63
--- /dev/null
+++ b/s/strategy_single.cpp
@@ -0,0 +1,131 @@
+// strategy_simple.cpp
+
+#include "stdafx.h"
+#include "request.h"
+#include "../client/connpool.h"
+#include "../db/commands.h"
+
+namespace mongo {
+
+ class SingleStrategy : public Strategy {
+
+ public:
+ SingleStrategy(){
+ _commandsSafeToPass.insert( "$eval" );
+ _commandsSafeToPass.insert( "create" );
+ }
+
+ private:
+ virtual void queryOp( Request& r ){
+ QueryMessage q( r.d() );
+
+ bool lateAssert = false;
+
+ log(3) << "single query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn << endl;
+
+ try {
+ if ( ( q.ntoreturn == -1 || q.ntoreturn == 1 ) && strstr(q.ns, ".$cmd") ) {
+ BSONObjBuilder builder;
+ bool ok = Command::runAgainstRegistered(q.ns, q.query, builder);
+ if ( ok ) {
+ BSONObj x = builder.done();
+ replyToQuery(0, r.p(), r.m(), x);
+ return;
+ }
+
+ string commandName = q.query.firstElement().fieldName();
+ if ( ! _commandsSafeToPass.count( commandName ) )
+ log() << "passing through unknown command: " << commandName << " " << q.query << endl;
+ }
+
+ lateAssert = true;
+ doQuery( r , r.singleServerName() );
+ }
+ catch ( AssertionException& e ) {
+ assert( !lateAssert );
+ BSONObjBuilder err;
+ err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg));
+ BSONObj errObj = err.done();
+ replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj);
+ return;
+ }
+
+ }
+
+ virtual void getMore( Request& r ){
+ const char *ns = r.getns();
+
+ log(3) << "single getmore: " << ns << endl;
+
+ ScopedDbConnection dbcon( r.singleServerName() );
+ DBClientBase& _c = dbcon.conn();
+
+ // TODO
+ DBClientConnection &c = dynamic_cast<DBClientConnection&>(_c);
+
+ Message response;
+ bool ok = c.port().call( r.m() , response);
+ uassert( 10204 , "dbgrid: getmore: error calling db", ok);
+ r.reply( response );
+
+ dbcon.done();
+
+ }
+
+ void handleIndexWrite( int op , Request& r ){
+
+ DbMessage& d = r.d();
+
+ if ( op == dbInsert ){
+ while( d.moreJSObjs() ){
+ BSONObj o = d.nextJsObj();
+ const char * ns = o["ns"].valuestr();
+ if ( r.getConfig()->isSharded( ns ) ){
+ uassert( 10205 , (string)"can't use unique indexes with sharding ns:" + ns +
+ " key: " + o["key"].embeddedObjectUserCheck().toString() ,
+ IndexDetails::isIdIndexPattern( o["key"].embeddedObjectUserCheck() ) ||
+ ! o["unique"].trueValue() );
+ ChunkManager * cm = r.getConfig()->getChunkManager( ns );
+ assert( cm );
+ for ( int i=0; i<cm->numChunks();i++)
+ doWrite( op , r , cm->getChunk(i)->getShard() );
+ }
+ else {
+ doWrite( op , r , r.singleServerName() );
+ }
+ }
+ }
+ else if ( op == dbUpdate ){
+ throw UserException( 8050 , "can't update system.indexes" );
+ }
+ else if ( op == dbDelete ){
+ // TODO
+ throw UserException( 8051 , "can't delete indexes on sharded collection yet" );
+ }
+ else {
+ log() << "handleIndexWrite invalid write op: " << op << endl;
+ throw UserException( 8052 , "handleIndexWrite invalid write op" );
+ }
+
+ }
+
+ virtual void writeOp( int op , Request& r ){
+ const char *ns = r.getns();
+
+ if ( r.isShardingEnabled() &&
+ strstr( ns , ".system.indexes" ) == strstr( ns , "." ) &&
+ strstr( ns , "." ) ){
+ log(1) << " .system.indexes write for: " << ns << endl;
+ handleIndexWrite( op , r );
+ return;
+ }
+
+ log(3) << "single write: " << ns << endl;
+ doWrite( op , r , r.singleServerName() );
+ }
+
+ set<string> _commandsSafeToPass;
+ };
+
+ Strategy * SINGLE = new SingleStrategy();
+}
diff --git a/s/util.h b/s/util.h
new file mode 100644
index 0000000..ba40a29
--- /dev/null
+++ b/s/util.h
@@ -0,0 +1,51 @@
+// util.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../stdafx.h"
+#include "../client/dbclient.h"
+
+/**
+ some generic sharding utils that can be used in mongod or mongos
+ */
+
+namespace mongo {
+
+ /**
+ your config info for a given shard/chunk is out of date */
+ class StaleConfigException : public std::exception {
+ public:
+ StaleConfigException( const string& ns , const string& msg){
+ stringstream s;
+ s << "StaleConfigException ns: " << ns << " " << msg;
+ _msg = s.str();
+ }
+
+ virtual ~StaleConfigException() throw(){}
+
+ virtual const char* what() const throw(){
+ return _msg.c_str();
+ }
+ private:
+ string _msg;
+ };
+
+ void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false );
+
+}