diff options
Diffstat (limited to 's')
-rw-r--r-- | s/chunk.cpp | 628 | ||||
-rw-r--r-- | s/chunk.h | 221 | ||||
-rw-r--r-- | s/commands_admin.cpp | 696 | ||||
-rw-r--r-- | s/commands_public.cpp | 368 | ||||
-rw-r--r-- | s/config.cpp | 535 | ||||
-rw-r--r-- | s/config.h | 195 | ||||
-rw-r--r-- | s/cursors.cpp | 104 | ||||
-rw-r--r-- | s/cursors.h | 56 | ||||
-rw-r--r-- | s/d_logic.cpp | 542 | ||||
-rw-r--r-- | s/d_logic.h | 23 | ||||
-rw-r--r-- | s/d_util.cpp | 36 | ||||
-rw-r--r-- | s/dbgrid.vcproj | 660 | ||||
-rw-r--r-- | s/dbgrid.vcxproj | 201 | ||||
-rw-r--r-- | s/request.cpp | 175 | ||||
-rw-r--r-- | s/request.h | 120 | ||||
-rw-r--r-- | s/s_only.cpp | 29 | ||||
-rw-r--r-- | s/server.cpp | 202 | ||||
-rw-r--r-- | s/server.h | 30 | ||||
-rw-r--r-- | s/shardkey.cpp | 320 | ||||
-rw-r--r-- | s/shardkey.h | 128 | ||||
-rw-r--r-- | s/strategy.cpp | 221 | ||||
-rw-r--r-- | s/strategy.h | 36 | ||||
-rw-r--r-- | s/strategy_shard.cpp | 260 | ||||
-rw-r--r-- | s/strategy_single.cpp | 131 | ||||
-rw-r--r-- | s/util.h | 51 |
25 files changed, 5968 insertions, 0 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp new file mode 100644 index 0000000..47c13e8 --- /dev/null +++ b/s/chunk.cpp @@ -0,0 +1,628 @@ +// shard.cpp + +/** + * Copyright (C) 2008 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "stdafx.h" +#include "chunk.h" +#include "config.h" +#include "../util/unittest.h" +#include "../client/connpool.h" +#include "cursors.h" +#include "strategy.h" + +namespace mongo { + + // ------- Shard -------- + + long Chunk::MaxChunkSize = 1024 * 1204 * 50; + + Chunk::Chunk( ChunkManager * manager ) : _manager( manager ){ + _modified = false; + _lastmod = 0; + _dataWritten = 0; + } + + void Chunk::setShard( string s ){ + _shard = s; + _markModified(); + } + + bool Chunk::contains( const BSONObj& obj ){ + return + _manager->getShardKey().compare( getMin() , obj ) <= 0 && + _manager->getShardKey().compare( obj , getMax() ) < 0; + } + + BSONObj Chunk::pickSplitPoint(){ + int sort = 0; + + if ( _manager->getShardKey().globalMin().woCompare( getMin() ) == 0 ){ + sort = 1; + } + else if ( _manager->getShardKey().globalMax().woCompare( getMax() ) == 0 ){ + sort = -1; + } + + if ( sort ){ + ScopedDbConnection conn( getShard() ); + Query q; + if ( sort == 1 ) + q.sort( _manager->getShardKey().key() ); + else { + BSONObj k = _manager->getShardKey().key(); + BSONObjBuilder r; + + BSONObjIterator i(k); + while( i.more() ) { + BSONElement e = i.next(); + uassert( 10163 , "can only handle numbers here - which i think is correct" , e.isNumber() ); + r.append( e.fieldName() , -1 * e.number() ); + } + + q.sort( r.obj() ); + } + BSONObj end = conn->findOne( _ns , q ); + conn.done(); + + if ( ! end.isEmpty() ) + return _manager->getShardKey().extractKey( end ); + } + + ScopedDbConnection conn( getShard() ); + BSONObj result; + if ( ! conn->runCommand( "admin" , BSON( "medianKey" << _ns + << "keyPattern" << _manager->getShardKey().key() + << "min" << getMin() + << "max" << getMax() + ) , result ) ){ + stringstream ss; + ss << "medianKey command failed: " << result; + uassert( 10164 , ss.str() , 0 ); + } + conn.done(); + + return result.getObjectField( "median" ).getOwned(); + } + + Chunk * Chunk::split(){ + return split( pickSplitPoint() ); + } + + Chunk * Chunk::split( const BSONObj& m ){ + uassert( 10165 , "can't split as shard that doesn't have a manager" , _manager ); + + log(1) << " before split on: " << m << "\n" + << "\t self : " << toString() << endl; + + uassert( 10166 , "locking namespace on server failed" , lockNamespaceOnServer( getShard() , _ns ) ); + + Chunk * s = new Chunk( _manager ); + s->_ns = _ns; + s->_shard = _shard; + s->setMin(m.getOwned()); + s->setMax(_max); + + s->_markModified(); + _markModified(); + + _manager->_chunks.push_back( s ); + + setMax(m.getOwned()); + + log(1) << " after split:\n" + << "\t left : " << toString() << "\n" + << "\t right: "<< s->toString() << endl; + + + _manager->save(); + + return s; + } + + bool Chunk::moveAndCommit( const string& to , string& errmsg ){ + uassert( 10167 , "can't move shard to its current location!" , to != getShard() ); + + log() << "moving chunk ns: " << _ns << " moving chunk: " << toString() << " " << _shard << " -> " << to << endl; + + string from = _shard; + ShardChunkVersion oldVersion = _manager->getVersion( from ); + + BSONObj filter; + { + BSONObjBuilder b; + getFilter( b ); + filter = b.obj(); + } + + ScopedDbConnection fromconn( from ); + + BSONObj startRes; + bool worked = fromconn->runCommand( "admin" , + BSON( "movechunk.start" << _ns << + "from" << from << + "to" << to << + "filter" << filter + ) , + startRes + ); + + if ( ! worked ){ + errmsg = (string)"movechunk.start failed: " + startRes.toString(); + fromconn.done(); + return false; + } + + // update config db + setShard( to ); + + // need to increment version # for old server + Chunk * randomChunkOnOldServer = _manager->findChunkOnServer( from ); + if ( randomChunkOnOldServer ) + randomChunkOnOldServer->_markModified(); + + _manager->save(); + + BSONObj finishRes; + { + + ShardChunkVersion newVersion = _manager->getVersion( from ); + if ( newVersion == 0 && oldVersion > 0 ){ + newVersion = oldVersion; + newVersion++; + _manager->save(); + } + else if ( newVersion <= oldVersion ){ + log() << "newVersion: " << newVersion << " oldVersion: " << oldVersion << endl; + uassert( 10168 , "version has to be higher" , newVersion > oldVersion ); + } + + BSONObjBuilder b; + b << "movechunk.finish" << _ns; + b << "to" << to; + b.appendTimestamp( "newVersion" , newVersion ); + b.append( startRes["finishToken"] ); + + worked = fromconn->runCommand( "admin" , + b.done() , + finishRes ); + } + + if ( ! worked ){ + errmsg = (string)"movechunk.finish failed: " + finishRes.toString(); + fromconn.done(); + return false; + } + + fromconn.done(); + return true; + } + + bool Chunk::splitIfShould( long dataWritten ){ + _dataWritten += dataWritten; + + if ( _dataWritten < MaxChunkSize / 5 ) + return false; + + _dataWritten = 0; + + if ( _min.woCompare( _max ) == 0 ){ + log() << "SHARD PROBLEM** shard is too big, but can't split: " << toString() << endl; + return false; + } + + long size = getPhysicalSize(); + if ( size < MaxChunkSize ) + return false; + + log() << "autosplitting " << _ns << " size: " << size << " shard: " << toString() << endl; + Chunk * newShard = split(); + + moveIfShould( newShard ); + + return true; + } + + bool Chunk::moveIfShould( Chunk * newChunk ){ + Chunk * toMove = 0; + + if ( newChunk->countObjects() <= 1 ){ + toMove = newChunk; + } + else if ( this->countObjects() <= 1 ){ + toMove = this; + } + else { + log(1) << "don't know how to decide if i should move inner shard" << endl; + } + + if ( ! toMove ) + return false; + + string newLocation = grid.pickShardForNewDB(); + if ( newLocation == getShard() ){ + // if this is the best server, then we shouldn't do anything! + log(1) << "not moving chunk: " << toString() << " b/c would move to same place " << newLocation << " -> " << getShard() << endl; + return 0; + } + + log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation << " #objcets: " << toMove->countObjects() << endl; + + string errmsg; + massert( 10412 , (string)"moveAndCommit failed: " + errmsg , + toMove->moveAndCommit( newLocation , errmsg ) ); + + return true; + } + + long Chunk::getPhysicalSize(){ + ScopedDbConnection conn( getShard() ); + + BSONObj result; + uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" , BSON( "datasize" << _ns + << "keyPattern" << _manager->getShardKey().key() + << "min" << getMin() + << "max" << getMax() + ) , result ) ); + + conn.done(); + return (long)result["size"].number(); + } + + + long Chunk::countObjects( const BSONObj& filter ){ + ScopedDbConnection conn( getShard() ); + + BSONObj f = getFilter(); + if ( ! filter.isEmpty() ) + f = ClusteredCursor::concatQuery( f , filter ); + + BSONObj result; + unsigned long long n = conn->count( _ns , f ); + + conn.done(); + return (long)n; + } + + bool Chunk::operator==( const Chunk& s ){ + return + _manager->getShardKey().compare( _min , s._min ) == 0 && + _manager->getShardKey().compare( _max , s._max ) == 0 + ; + } + + void Chunk::getFilter( BSONObjBuilder& b ){ + _manager->_key.getFilter( b , _min , _max ); + } + + void Chunk::serialize(BSONObjBuilder& to){ + if ( _lastmod ) + to.appendTimestamp( "lastmod" , _lastmod ); + else + to.appendTimestamp( "lastmod" ); + + to << "ns" << _ns; + to << "min" << _min; + to << "max" << _max; + to << "shard" << _shard; + } + + void Chunk::unserialize(const BSONObj& from){ + _ns = from.getStringField( "ns" ); + _shard = from.getStringField( "shard" ); + _lastmod = from.hasField( "lastmod" ) ? from["lastmod"]._numberLong() : 0; + + BSONElement e = from["minDotted"]; + cout << from << endl; + if (e.eoo()){ + _min = from.getObjectField( "min" ).getOwned(); + _max = from.getObjectField( "max" ).getOwned(); + } else { // TODO delete this case after giving people a chance to migrate + _min = e.embeddedObject().getOwned(); + _max = from.getObjectField( "maxDotted" ).getOwned(); + } + + uassert( 10170 , "Chunk needs a ns" , ! _ns.empty() ); + uassert( 10171 , "Chunk needs a server" , ! _ns.empty() ); + + uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() ); + uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() ); + } + + string Chunk::modelServer() { + // TODO: this could move around? + return configServer.modelServer(); + } + + void Chunk::_markModified(){ + _modified = true; + // set to 0 so that the config server sets it + _lastmod = 0; + } + + void Chunk::save( bool check ){ + bool reload = ! _lastmod; + Model::save( check ); + if ( reload ){ + // need to do this so that we get the new _lastMod and therefore version number + massert( 10413 , "_id has to be filled in already" , ! _id.isEmpty() ); + + string b = toString(); + BSONObj q = _id.copy(); + massert( 10414 , "how could load fail?" , load( q ) ); + log(2) << "before: " << q << "\t" << b << endl; + log(2) << "after : " << _id << "\t" << toString() << endl; + massert( 10415 , "chunk reload changed content!" , b == toString() ); + massert( 10416 , "id changed!" , q["_id"] == _id["_id"] ); + } + } + + void Chunk::ensureIndex(){ + ScopedDbConnection conn( getShard() ); + conn->ensureIndex( _ns , _manager->getShardKey().key() , _manager->_unique ); + conn.done(); + } + + string Chunk::toString() const { + stringstream ss; + ss << "shard ns:" << _ns << " shard: " << _shard << " min: " << _min << " max: " << _max; + return ss.str(); + } + + + ShardKeyPattern Chunk::skey(){ + return _manager->getShardKey(); + } + + // ------- ChunkManager -------- + + unsigned long long ChunkManager::NextSequenceNumber = 1; + + ChunkManager::ChunkManager( DBConfig * config , string ns , ShardKeyPattern pattern , bool unique ) : + _config( config ) , _ns( ns ) , _key( pattern ) , _unique( unique ){ + Chunk temp(0); + + ScopedDbConnection conn( temp.modelServer() ); + auto_ptr<DBClientCursor> cursor = conn->query( temp.getNS() , BSON( "ns" << ns ) ); + while ( cursor->more() ){ + BSONObj d = cursor->next(); + if ( d["isMaxMarker"].trueValue() ){ + continue; + } + + Chunk * c = new Chunk( this ); + c->unserialize( d ); + _chunks.push_back( c ); + c->_id = d["_id"].wrap().getOwned(); + } + conn.done(); + + if ( _chunks.size() == 0 ){ + Chunk * c = new Chunk( this ); + c->_ns = ns; + c->setMin(_key.globalMin()); + c->setMax(_key.globalMax()); + c->_shard = config->getPrimary(); + c->_markModified(); + + _chunks.push_back( c ); + + log() << "no chunks for:" << ns << " so creating first: " << c->toString() << endl; + } + + _sequenceNumber = ++NextSequenceNumber; + } + + ChunkManager::~ChunkManager(){ + for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){ + delete( *i ); + } + _chunks.clear(); + } + + bool ChunkManager::hasShardKey( const BSONObj& obj ){ + return _key.hasShardKey( obj ); + } + + Chunk& ChunkManager::findChunk( const BSONObj & obj ){ + + for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){ + Chunk * c = *i; + if ( c->contains( obj ) ) + return *c; + } + stringstream ss; + ss << "couldn't find a chunk which should be impossible extracted: " << _key.extractKey( obj ); + throw UserException( 8070 , ss.str() ); + } + + Chunk* ChunkManager::findChunkOnServer( const string& server ) const { + + for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ + Chunk * c = *i; + if ( c->getShard() == server ) + return c; + } + + return 0; + } + + int ChunkManager::getChunksForQuery( vector<Chunk*>& chunks , const BSONObj& query ){ + int added = 0; + + for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){ + Chunk * c = *i; + if ( _key.relevantForQuery( query , c ) ){ + chunks.push_back( c ); + added++; + } + } + return added; + } + + void ChunkManager::getAllServers( set<string>& allServers ){ + for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){ + allServers.insert( (*i)->getShard() ); + } + } + + void ChunkManager::ensureIndex(){ + set<string> seen; + + for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ + Chunk * c = *i; + if ( seen.count( c->getShard() ) ) + continue; + seen.insert( c->getShard() ); + c->ensureIndex(); + } + } + + void ChunkManager::drop(){ + uassert( 10174 , "config servers not all up" , configServer.allUp() ); + + map<string,ShardChunkVersion> seen; + + log(1) << "ChunkManager::drop : " << _ns << endl; + + // lock all shards so no one can do a split/migrate + for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ + Chunk * c = *i; + ShardChunkVersion& version = seen[ c->getShard() ]; + if ( version ) + continue; + version = lockNamespaceOnServer( c->getShard() , _ns ); + if ( version ) + continue; + + // rollback + uassert( 10175 , "don't know how to rollback locks b/c drop can't lock all shards" , 0 ); + } + + log(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl; + + // wipe my meta-data + _chunks.clear(); + + + // delete data from mongod + for ( map<string,ShardChunkVersion>::iterator i=seen.begin(); i!=seen.end(); i++ ){ + string shard = i->first; + ScopedDbConnection conn( shard ); + conn->dropCollection( _ns ); + conn.done(); + } + + log(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl; + + // clean up database meta-data + uassert( 10176 , "no sharding data?" , _config->removeSharding( _ns ) ); + _config->save(); + + + // remove chunk data + Chunk temp(0); + ScopedDbConnection conn( temp.modelServer() ); + conn->remove( temp.getNS() , BSON( "ns" << _ns ) ); + conn.done(); + log(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl; + + for ( map<string,ShardChunkVersion>::iterator i=seen.begin(); i!=seen.end(); i++ ){ + ScopedDbConnection conn( i->first ); + BSONObj res; + if ( ! setShardVersion( conn.conn() , _ns , 0 , true , res ) ) + throw UserException( 8071 , (string)"OH KNOW, cleaning up after drop failed: " + res.toString() ); + conn.done(); + } + + + log(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl; + } + + void ChunkManager::save(){ + ShardChunkVersion a = getVersion(); + + set<string> withRealChunks; + + for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ + Chunk* c = *i; + if ( ! c->_modified ) + continue; + c->save( true ); + _sequenceNumber = ++NextSequenceNumber; + + withRealChunks.insert( c->getShard() ); + } + + massert( 10417 , "how did version get smalled" , getVersion() >= a ); + + ensureIndex(); // TODO: this is too aggressive - but not really sooo bad + } + + ShardChunkVersion ChunkManager::getVersion( const string& server ) const{ + // TODO: cache or something? + + ShardChunkVersion max = 0; + + for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ + Chunk* c = *i; + if ( c->getShard() != server ) + continue; + + if ( c->_lastmod > max ) + max = c->_lastmod; + } + + return max; + } + + ShardChunkVersion ChunkManager::getVersion() const{ + ShardChunkVersion max = 0; + + for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ + Chunk* c = *i; + if ( c->_lastmod > max ) + max = c->_lastmod; + } + + return max; + } + + string ChunkManager::toString() const { + stringstream ss; + ss << "ChunkManager: " << _ns << " key:" << _key.toString() << "\n"; + for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ + const Chunk* c = *i; + ss << "\t" << c->toString() << "\n"; + } + return ss.str(); + } + + + class ChunkObjUnitTest : public UnitTest { + public: + void runShard(){ + + } + + void run(){ + runShard(); + log(1) << "shardObjTest passed" << endl; + } + } shardObjTest; + + +} // namespace mongo diff --git a/s/chunk.h b/s/chunk.h new file mode 100644 index 0000000..7395133 --- /dev/null +++ b/s/chunk.h @@ -0,0 +1,221 @@ +// shard.h + +/* + A "shard" is a database (replica pair typically) which represents + one partition of the overall database. +*/ + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../stdafx.h" +#include "../client/dbclient.h" +#include "../client/model.h" +#include "shardkey.h" +#include <boost/utility.hpp> +#undef assert +#define assert xassert + +namespace mongo { + + class DBConfig; + class ChunkManager; + class ChunkObjUnitTest; + + typedef unsigned long long ShardChunkVersion; + + /** + config.chunks + { ns : "alleyinsider.fs.chunks" , min : {} , max : {} , server : "localhost:30001" } + + x is in a shard iff + min <= x < max + */ + class Chunk : public Model , boost::noncopyable { + public: + + Chunk( ChunkManager * info ); + + const BSONObj& getMin() const { return _min; } + const BSONObj& getMax() const { return _max; } + + void setMin(const BSONObj& o){ + _min = o; + } + void setMax(const BSONObj& o){ + _max = o; + } + + string getShard(){ + return _shard; + } + void setShard( string shard ); + + bool contains( const BSONObj& obj ); + + string toString() const; + operator string() const { return toString(); } + + bool operator==(const Chunk& s); + + bool operator!=(const Chunk& s){ + return ! ( *this == s ); + } + + void getFilter( BSONObjBuilder& b ); + BSONObj getFilter(){ BSONObjBuilder b; getFilter( b ); return b.obj(); } + + + BSONObj pickSplitPoint(); + Chunk * split(); + Chunk * split( const BSONObj& middle ); + + /** + * @return size of shard in bytes + * talks to mongod to do this + */ + long getPhysicalSize(); + + long countObjects( const BSONObj& filter = BSONObj() ); + + /** + * if the amount of data written nears the max size of a shard + * then we check the real size, and if its too big, we split + */ + bool splitIfShould( long dataWritten ); + + + /* + * moves either this shard or newShard if it makes sense too + * @return whether or not a shard was moved + */ + bool moveIfShould( Chunk * newShard = 0 ); + + bool moveAndCommit( const string& to , string& errmsg ); + + virtual const char * getNS(){ return "config.chunks"; } + virtual void serialize(BSONObjBuilder& to); + virtual void unserialize(const BSONObj& from); + virtual string modelServer(); + + virtual void save( bool check=false ); + + void ensureIndex(); + + void _markModified(); + + static long MaxChunkSize; + + private: + + // main shard info + + ChunkManager * _manager; + ShardKeyPattern skey(); + + string _ns; + BSONObj _min; + BSONObj _max; + string _shard; + ShardChunkVersion _lastmod; + + bool _modified; + + // transient stuff + + long _dataWritten; + + // methods, etc.. + + void _split( BSONObj& middle ); + + friend class ChunkManager; + friend class ShardObjUnitTest; + }; + + /* config.sharding + { ns: 'alleyinsider.fs.chunks' , + key: { ts : 1 } , + shards: [ { min: 1, max: 100, server: a } , { min: 101, max: 200 , server : b } ] + } + */ + class ChunkManager { + public: + + ChunkManager( DBConfig * config , string ns , ShardKeyPattern pattern , bool unique ); + virtual ~ChunkManager(); + + string getns(){ + return _ns; + } + + int numChunks(){ return _chunks.size(); } + Chunk* getChunk( int i ){ return _chunks[i]; } + bool hasShardKey( const BSONObj& obj ); + + Chunk& findChunk( const BSONObj& obj ); + Chunk* findChunkOnServer( const string& server ) const; + + ShardKeyPattern& getShardKey(){ return _key; } + bool isUnique(){ return _unique; } + + /** + * makes sure the shard index is on all servers + */ + void ensureIndex(); + + /** + * @return number of Chunk added to the vector + */ + int getChunksForQuery( vector<Chunk*>& chunks , const BSONObj& query ); + + void getAllServers( set<string>& allServers ); + + void save(); + + string toString() const; + operator string() const { return toString(); } + + ShardChunkVersion getVersion( const string& server ) const; + ShardChunkVersion getVersion() const; + + /** + * this is just an increasing number of how many ChunkManagers we have so we know if something has been updated + */ + unsigned long long getSequenceNumber(){ + return _sequenceNumber; + } + + void drop(); + + private: + DBConfig * _config; + string _ns; + ShardKeyPattern _key; + bool _unique; + + vector<Chunk*> _chunks; + map<string,unsigned long long> _maxMarkers; + + unsigned long long _sequenceNumber; + + friend class Chunk; + static unsigned long long NextSequenceNumber; + }; + +} // namespace mongo diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp new file mode 100644 index 0000000..e79b529 --- /dev/null +++ b/s/commands_admin.cpp @@ -0,0 +1,696 @@ +// s/commands_admin.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +/* TODO + _ concurrency control. + _ limit() works right? + _ KillCursors + + later + _ secondary indexes +*/ + +#include "stdafx.h" +#include "../util/message.h" +#include "../db/dbmessage.h" +#include "../client/connpool.h" +#include "../db/commands.h" + +#include "config.h" +#include "chunk.h" +#include "strategy.h" + +namespace mongo { + + extern string ourHostname; + + namespace dbgrid_cmds { + + set<string> dbgridCommands; + + class GridAdminCmd : public Command { + public: + GridAdminCmd( const char * n ) : Command( n ){ + dbgridCommands.insert( n ); + } + virtual bool slaveOk(){ + return true; + } + virtual bool adminOnly() { + return true; + } + }; + + // --------------- misc commands ---------------------- + + class NetStatCmd : public GridAdminCmd { + public: + NetStatCmd() : GridAdminCmd("netstat") { } + virtual void help( stringstream& help ) const { + help << " shows status/reachability of servers in the cluster"; + } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + result.append("configserver", configServer.getPrimary() ); + result.append("isdbgrid", 1); + return true; + } + } netstat; + + class ListGridCommands : public GridAdminCmd { + public: + ListGridCommands() : GridAdminCmd("gridcommands") { } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + BSONObjBuilder arr; + int num=0; + for ( set<string>::iterator i = dbgridCommands.begin(); i != dbgridCommands.end(); i++ ){ + string s = BSONObjBuilder::numStr( num++ ); + arr.append( s.c_str() , *i ); + } + + result.appendArray( "commands" , arr.done() ); + return true; + } + } listGridCommands; + + // ------------ database level commands ------------- + + class ListDatabaseCommand : public GridAdminCmd { + public: + ListDatabaseCommand() : GridAdminCmd("listdatabases") { } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + ScopedDbConnection conn( configServer.getPrimary() ); + + auto_ptr<DBClientCursor> cursor = conn->query( "config.databases" , BSONObj() ); + + BSONObjBuilder list; + int num = 0; + while ( cursor->more() ){ + string s = BSONObjBuilder::numStr( num++ ); + + BSONObj o = cursor->next(); + list.append( s.c_str() , o["name"].valuestrsafe() ); + } + + result.appendArray("databases" , list.obj() ); + conn.done(); + + return true; + } + } gridListDatabase; + + class MoveDatabasePrimaryCommand : public GridAdminCmd { + public: + MoveDatabasePrimaryCommand() : GridAdminCmd("moveprimary") { } + virtual void help( stringstream& help ) const { + help << " example: { moveprimary : 'foo' , to : 'localhost:9999' } TODO: locking? "; + } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string dbname = cmdObj["moveprimary"].valuestrsafe(); + + if ( dbname.size() == 0 ){ + errmsg = "no db"; + return false; + } + + if ( dbname == "config" ){ + errmsg = "can't move config db"; + return false; + } + + DBConfig * config = grid.getDBConfig( dbname , false ); + if ( ! config ){ + errmsg = "can't find db!"; + return false; + } + + string to = cmdObj["to"].valuestrsafe(); + if ( ! to.size() ){ + errmsg = "you have to specify where you want to move it"; + return false; + } + + if ( to == config->getPrimary() ){ + errmsg = "thats already the primary"; + return false; + } + + if ( ! grid.knowAboutShard( to ) ){ + errmsg = "that server isn't known to me"; + return false; + } + + ScopedDbConnection conn( configServer.getPrimary() ); + + log() << "moving " << dbname << " primary from: " << config->getPrimary() << " to: " << to << endl; + + // TODO LOCKING: this is not safe with multiple mongos + + + ScopedDbConnection toconn( to ); + + // TODO AARON - we need a clone command which replays operations from clone start to now + // using a seperate smaller oplog + BSONObj cloneRes; + bool worked = toconn->runCommand( dbname.c_str() , BSON( "clone" << config->getPrimary() ) , cloneRes ); + toconn.done(); + if ( ! worked ){ + log() << "clone failed" << cloneRes << endl; + errmsg = "clone failed"; + conn.done(); + return false; + } + + ScopedDbConnection fromconn( config->getPrimary() ); + + config->setPrimary( to ); + config->save( true ); + + log() << " dropping " << dbname << " from old" << endl; + + fromconn->dropDatabase( dbname.c_str() ); + fromconn.done(); + + result << "primary" << to; + + conn.done(); + return true; + } + } movePrimary; + + class EnableShardingCmd : public GridAdminCmd { + public: + EnableShardingCmd() : GridAdminCmd( "enablesharding" ){} + virtual void help( stringstream& help ) const { + help + << "Enable sharding for a db. (Use 'shardcollection' command afterwards.)\n" + << " { enablesharding : \"<dbname>\" }\n"; + } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string dbname = cmdObj["enablesharding"].valuestrsafe(); + if ( dbname.size() == 0 ){ + errmsg = "no db"; + return false; + } + + DBConfig * config = grid.getDBConfig( dbname ); + if ( config->isShardingEnabled() ){ + errmsg = "already enabled"; + return false; + } + + log() << "enabling sharding on: " << dbname << endl; + + config->enableSharding(); + config->save( true ); + + return true; + } + } enableShardingCmd; + + // ------------ collection level commands ------------- + + class ShardCollectionCmd : public GridAdminCmd { + public: + ShardCollectionCmd() : GridAdminCmd( "shardcollection" ){} + virtual void help( stringstream& help ) const { + help + << "Shard a collection. Requires key. Optional unique. Sharding must already be enabled for the database.\n" + << " { enablesharding : \"<dbname>\" }\n"; + } + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string ns = cmdObj["shardcollection"].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "no ns"; + return false; + } + + DBConfig * config = grid.getDBConfig( ns ); + if ( ! config->isShardingEnabled() ){ + errmsg = "sharding not enabled for db"; + return false; + } + + if ( config->isSharded( ns ) ){ + errmsg = "already sharded"; + return false; + } + + BSONObj key = cmdObj.getObjectField( "key" ); + if ( key.isEmpty() ){ + errmsg = "no shard key"; + return false; + } else if (key.nFields() > 1){ + errmsg = "compound shard keys not supported yet"; + return false; + } + + if ( ns.find( ".system." ) != string::npos ){ + errmsg = "can't shard system namespaces"; + return false; + } + + { + ScopedDbConnection conn( config->getPrimary() ); + BSONObjBuilder b; + b.append( "ns" , ns ); + b.appendBool( "unique" , true ); + if ( conn->count( config->getName() + ".system.indexes" , b.obj() ) ){ + errmsg = "can't shard collection with unique indexes"; + conn.done(); + return false; + } + + BSONObj res = conn->findOne( config->getName() + ".system.namespaces" , BSON( "name" << ns ) ); + if ( res["options"].type() == Object && res["options"].embeddedObject()["capped"].trueValue() ){ + errmsg = "can't shard capped collection"; + conn.done(); + return false; + } + + conn.done(); + } + + log() << "CMD: shardcollection: " << cmdObj << endl; + + config->shardCollection( ns , key , cmdObj["unique"].trueValue() ); + config->save( true ); + + result << "collectionsharded" << ns; + return true; + } + } shardCollectionCmd; + + class GetShardVersion : public GridAdminCmd { + public: + GetShardVersion() : GridAdminCmd( "getShardVersion" ){} + virtual void help( stringstream& help ) const { + help << " example: { getShardVersion : 'alleyinsider.foo' } "; + } + + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string ns = cmdObj["getShardVersion"].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "need to speciy fully namespace"; + return false; + } + + DBConfig * config = grid.getDBConfig( ns ); + if ( ! config->isSharded( ns ) ){ + errmsg = "ns not sharded."; + return false; + } + + ChunkManager * cm = config->getChunkManager( ns ); + if ( ! cm ){ + errmsg = "no chunk manager?"; + return false; + } + + result.appendTimestamp( "version" , cm->getVersion() ); + + return 1; + } + } getShardVersionCmd; + + class SplitCollectionHelper : public GridAdminCmd { + public: + SplitCollectionHelper( const char * name ) : GridAdminCmd( name ) , _name( name ){} + virtual void help( stringstream& help ) const { + help + << " example: { shard : 'alleyinsider.blog.posts' , find : { ts : 1 } } - split the shard that contains give key \n" + << " example: { shard : 'alleyinsider.blog.posts' , middle : { ts : 1 } } - split the shard that contains the key with this as the middle \n" + << " NOTE: this does not move move the chunks, it merely creates a logical seperation \n" + ; + } + + virtual bool _split( BSONObjBuilder& result , string&errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ) = 0; + + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string ns = cmdObj[_name.c_str()].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "no ns"; + return false; + } + + DBConfig * config = grid.getDBConfig( ns ); + if ( ! config->isSharded( ns ) ){ + errmsg = "ns not sharded. have to shard before can split"; + return false; + } + + BSONObj find = cmdObj.getObjectField( "find" ); + if ( find.isEmpty() ){ + find = cmdObj.getObjectField( "middle" ); + + if ( find.isEmpty() ){ + errmsg = "need to specify find or middle"; + return false; + } + } + + ChunkManager * info = config->getChunkManager( ns ); + Chunk& old = info->findChunk( find ); + + return _split( result , errmsg , ns , info , old , cmdObj.getObjectField( "middle" ) ); + } + + protected: + string _name; + }; + + class SplitValueCommand : public SplitCollectionHelper { + public: + SplitValueCommand() : SplitCollectionHelper( "splitvalue" ){} + virtual bool _split( BSONObjBuilder& result , string& errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ){ + + result << "shardinfo" << old.toString(); + + result.appendBool( "auto" , middle.isEmpty() ); + + if ( middle.isEmpty() ) + middle = old.pickSplitPoint(); + + result.append( "middle" , middle ); + + return true; + } + + } splitValueCmd; + + + class SplitCollection : public SplitCollectionHelper { + public: + SplitCollection() : SplitCollectionHelper( "split" ){} + virtual bool _split( BSONObjBuilder& result , string& errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ){ + + log() << "splitting: " << ns << " shard: " << old << endl; + + if ( middle.isEmpty() ) + old.split(); + else + old.split( middle ); + + return true; + } + + + } splitCollectionCmd; + + class MoveChunkCmd : public GridAdminCmd { + public: + MoveChunkCmd() : GridAdminCmd( "movechunk" ){} + virtual void help( stringstream& help ) const { + help << "{ movechunk : 'test.foo' , find : { num : 1 } , to : 'localhost:30001' }"; + } + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string ns = cmdObj["movechunk"].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "no ns"; + return false; + } + + DBConfig * config = grid.getDBConfig( ns ); + if ( ! config->isSharded( ns ) ){ + errmsg = "ns not sharded. have to shard before can move a chunk"; + return false; + } + + BSONObj find = cmdObj.getObjectField( "find" ); + if ( find.isEmpty() ){ + errmsg = "need to specify find. see help"; + return false; + } + + string to = cmdObj["to"].valuestrsafe(); + if ( ! to.size() ){ + errmsg = "you have to specify where you want to move the chunk"; + return false; + } + + log() << "CMD: movechunk: " << cmdObj << endl; + + ChunkManager * info = config->getChunkManager( ns ); + Chunk& c = info->findChunk( find ); + string from = c.getShard(); + + if ( from == to ){ + errmsg = "that chunk is already on that shard"; + return false; + } + + if ( ! grid.knowAboutShard( to ) ){ + errmsg = "that shard isn't known to me"; + return false; + } + + if ( ! c.moveAndCommit( to , errmsg ) ) + return false; + + return true; + } + } moveChunkCmd; + + // ------------ server level commands ------------- + + class ListShardsCmd : public GridAdminCmd { + public: + ListShardsCmd() : GridAdminCmd("listshards") { } + virtual void help( stringstream& help ) const { + help << "list all shards of the system"; + } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + ScopedDbConnection conn( configServer.getPrimary() ); + + vector<BSONObj> all; + auto_ptr<DBClientCursor> cursor = conn->query( "config.shards" , BSONObj() ); + while ( cursor->more() ){ + BSONObj o = cursor->next(); + all.push_back( o ); + } + + result.append("shards" , all ); + conn.done(); + + return true; + } + } listShardsCmd; + + /* a shard is a single mongod server or a replica pair. add it (them) to the cluster as a storage partition. */ + class AddShard : public GridAdminCmd { + public: + AddShard() : GridAdminCmd("addshard") { } + virtual void help( stringstream& help ) const { + help << "add a new shard to the system"; + } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + ScopedDbConnection conn( configServer.getPrimary() ); + + + string host = cmdObj["addshard"].valuestrsafe(); + + if ( host == "localhost" || host.find( "localhost:" ) == 0 || + host == "127.0.0.1" || host.find( "127.0.0.1:" ) == 0 ){ + if ( cmdObj["allowLocal"].type() != Bool || + ! cmdObj["allowLocal"].boolean() ){ + errmsg = + "can't use localhost as a shard since all shards need to communicate. " + "allowLocal to override for testing"; + return false; + } + } + + if ( host.find( ":" ) == string::npos ){ + stringstream ss; + ss << host << ":" << CmdLine::ShardServerPort; + host = ss.str(); + } + + BSONObj shard; + { + BSONObjBuilder b; + b.append( "host" , host ); + if ( cmdObj["maxSize"].isNumber() ) + b.append( cmdObj["maxSize"] ); + shard = b.obj(); + } + + BSONObj old = conn->findOne( "config.shards" , shard ); + if ( ! old.isEmpty() ){ + result.append( "msg" , "already exists" ); + conn.done(); + return false; + } + + try { + ScopedDbConnection newShardConn( host ); + newShardConn->getLastError(); + newShardConn.done(); + } + catch ( DBException& e ){ + errmsg = "couldn't connect to new shard"; + result.append( "host" , host ); + result.append( "exception" , e.what() ); + conn.done(); + return false; + } + + + + conn->insert( "config.shards" , shard ); + result.append( "added" , shard["host"].valuestrsafe() ); + conn.done(); + return true; + } + } addServer; + + class RemoveShardCmd : public GridAdminCmd { + public: + RemoveShardCmd() : GridAdminCmd("removeshard") { } + virtual void help( stringstream& help ) const { + help << "remove a shard to the system.\nshard must be empty or command will return an error."; + } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + if ( 1 ){ + errmsg = "removeshard not yet implemented"; + return 0; + } + + ScopedDbConnection conn( configServer.getPrimary() ); + + BSONObj server = BSON( "host" << cmdObj["removeshard"].valuestrsafe() ); + conn->remove( "config.shards" , server ); + + conn.done(); + return true; + } + } removeShardCmd; + + + // --------------- public commands ---------------- + + class IsDbGridCmd : public Command { + public: + virtual bool slaveOk() { + return true; + } + IsDbGridCmd() : Command("isdbgrid") { } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + result.append("isdbgrid", 1); + result.append("hostname", ourHostname); + return true; + } + } isdbgrid; + + class CmdIsMaster : public Command { + public: + virtual bool requiresAuth() { return false; } + virtual bool slaveOk() { + return true; + } + virtual void help( stringstream& help ) const { + help << "test if this is master half of a replica pair"; + } + CmdIsMaster() : Command("ismaster") { } + virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + result.append("ismaster", 1.0 ); + result.append("msg", "isdbgrid"); + return true; + } + } ismaster; + + class CmdShardingGetPrevError : public Command { + public: + virtual bool requiresAuth() { return false; } + virtual bool slaveOk() { + return true; + } + virtual void help( stringstream& help ) const { + help << "get previous error (since last reseterror command)"; + } + CmdShardingGetPrevError() : Command("getpreverror") { } + virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + errmsg += "getpreverror not supported for sharded environments"; + return false; + } + } cmdGetPrevError; + + class CmdShardingGetLastError : public Command { + public: + virtual bool requiresAuth() { return false; } + virtual bool slaveOk() { + return true; + } + virtual void help( stringstream& help ) const { + help << "check for an error on the last command executed"; + } + CmdShardingGetLastError() : Command("getlasterror") { } + virtual bool run(const char *nsraw, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + string dbName = nsraw; + dbName = dbName.substr( 0 , dbName.size() - 5 ); + + DBConfig * conf = grid.getDBConfig( dbName , false ); + + ClientInfo * client = ClientInfo::get(); + set<string> * shards = client->getPrev(); + + if ( shards->size() == 0 ){ + result.appendNull( "err" ); + return true; + } + + if ( shards->size() == 1 ){ + string theShard = *(shards->begin() ); + result.append( "theshard" , theShard.c_str() ); + ScopedDbConnection conn( theShard ); + BSONObj res; + bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + result.appendElements( res ); + conn.done(); + return ok; + } + + vector<string> errors; + for ( set<string>::iterator i = shards->begin(); i != shards->end(); i++ ){ + string theShard = *i; + ScopedDbConnection conn( theShard ); + string temp = conn->getLastError(); + if ( temp.size() ) + errors.push_back( temp ); + conn.done(); + } + + if ( errors.size() == 0 ){ + result.appendNull( "err" ); + return true; + } + + result.append( "err" , errors[0].c_str() ); + + BSONObjBuilder all; + for ( unsigned i=0; i<errors.size(); i++ ){ + all.append( all.numStr( i ).c_str() , errors[i].c_str() ); + } + result.appendArray( "errs" , all.obj() ); + return true; + } + } cmdGetLastError; + + } + +} // namespace mongo diff --git a/s/commands_public.cpp b/s/commands_public.cpp new file mode 100644 index 0000000..2d3de7a --- /dev/null +++ b/s/commands_public.cpp @@ -0,0 +1,368 @@ +// s/commands_public.cpp + + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "stdafx.h" +#include "../util/message.h" +#include "../db/dbmessage.h" +#include "../client/connpool.h" +#include "../client/parallel.h" +#include "../db/commands.h" + +#include "config.h" +#include "chunk.h" +#include "strategy.h" + +namespace mongo { + + namespace dbgrid_pub_cmds { + + class PublicGridCommand : public Command { + public: + PublicGridCommand( const char * n ) : Command( n ){ + } + virtual bool slaveOk(){ + return true; + } + virtual bool adminOnly() { + return false; + } + protected: + string getDBName( string ns ){ + return ns.substr( 0 , ns.size() - 5 ); + } + + bool passthrough( DBConfig * conf, const BSONObj& cmdObj , BSONObjBuilder& result ){ + ScopedDbConnection conn( conf->getPrimary() ); + BSONObj res; + bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + result.appendElements( res ); + conn.done(); + return ok; + } + }; + + class NotAllowedOnShardedCollectionCmd : public PublicGridCommand { + public: + NotAllowedOnShardedCollectionCmd( const char * n ) : PublicGridCommand( n ){} + + virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) = 0; + + virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + string dbName = getDBName( ns ); + string fullns = getFullNS( dbName , cmdObj ); + + DBConfig * conf = grid.getDBConfig( dbName , false ); + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + return passthrough( conf , cmdObj , result ); + } + errmsg = "can't do command: " + name + " on sharded collection"; + return false; + } + }; + + // ---- + + class DropCmd : public PublicGridCommand { + public: + DropCmd() : PublicGridCommand( "drop" ){} + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + string dbName = getDBName( ns ); + string collection = cmdObj.firstElement().valuestrsafe(); + string fullns = dbName + "." + collection; + + DBConfig * conf = grid.getDBConfig( dbName , false ); + + log() << "DROP: " << fullns << endl; + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + return passthrough( conf , cmdObj , result ); + } + + ChunkManager * cm = conf->getChunkManager( fullns ); + massert( 10418 , "how could chunk manager be null!" , cm ); + + cm->drop(); + + return 1; + } + } dropCmd; + + class DropDBCmd : public PublicGridCommand { + public: + DropDBCmd() : PublicGridCommand( "dropDatabase" ){} + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + BSONElement e = cmdObj.firstElement(); + + if ( ! e.isNumber() || e.number() != 1 ){ + errmsg = "invalid params"; + return 0; + } + + string dbName = getDBName( ns ); + DBConfig * conf = grid.getDBConfig( dbName , false ); + + log() << "DROP DATABASE: " << dbName << endl; + + if ( ! conf || ! conf->isShardingEnabled() ){ + log(1) << " passing though drop database for: " << dbName << endl; + return passthrough( conf , cmdObj , result ); + } + + if ( ! conf->dropDatabase( errmsg ) ) + return false; + + result.append( "dropped" , dbName ); + return true; + } + } dropDBCmd; + + class CountCmd : public PublicGridCommand { + public: + CountCmd() : PublicGridCommand("count") { } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + string dbName = getDBName( ns ); + string collection = cmdObj.firstElement().valuestrsafe(); + string fullns = dbName + "." + collection; + + BSONObj filter = cmdObj["query"].embeddedObject(); + + DBConfig * conf = grid.getDBConfig( dbName , false ); + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + ScopedDbConnection conn( conf->getPrimary() ); + result.append( "n" , (double)conn->count( fullns , filter ) ); + conn.done(); + return true; + } + + ChunkManager * cm = conf->getChunkManager( fullns ); + massert( 10419 , "how could chunk manager be null!" , cm ); + + vector<Chunk*> chunks; + cm->getChunksForQuery( chunks , filter ); + + unsigned long long total = 0; + for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ + Chunk * c = *i; + total += c->countObjects( filter ); + } + + result.append( "n" , (double)total ); + return true; + } + } countCmd; + + class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd { + public: + ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped"){} + + virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ){ + return dbName + "." + cmdObj.firstElement().valuestrsafe(); + } + + } convertToCappedCmd; + + + class GroupCmd : public NotAllowedOnShardedCollectionCmd { + public: + GroupCmd() : NotAllowedOnShardedCollectionCmd("group"){} + + virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ){ + return dbName + "." + cmdObj.firstElement().embeddedObjectUserCheck()["ns"].valuestrsafe(); + } + + } groupCmd; + + class DistinctCmd : public PublicGridCommand { + public: + DistinctCmd() : PublicGridCommand("distinct"){} + virtual void help( stringstream &help ) const { + help << "{ distinct : 'collection name' , key : 'a.b' }"; + } + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + string dbName = getDBName( ns ); + string collection = cmdObj.firstElement().valuestrsafe(); + string fullns = dbName + "." + collection; + + DBConfig * conf = grid.getDBConfig( dbName , false ); + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + return passthrough( conf , cmdObj , result ); + } + + ChunkManager * cm = conf->getChunkManager( fullns ); + massert( 10420 , "how could chunk manager be null!" , cm ); + + vector<Chunk*> chunks; + cm->getChunksForQuery( chunks , BSONObj() ); + + set<BSONObj,BSONObjCmp> all; + int size = 32; + + for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ + Chunk * c = *i; + + ScopedDbConnection conn( c->getShard() ); + BSONObj res; + bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + conn.done(); + + if ( ! ok ){ + result.appendElements( res ); + return false; + } + + BSONObjIterator it( res["values"].embeddedObjectUserCheck() ); + while ( it.more() ){ + BSONElement nxt = it.next(); + BSONObjBuilder temp(32); + temp.appendAs( nxt , "x" ); + all.insert( temp.obj() ); + } + + } + + BSONObjBuilder b( size ); + int n=0; + for ( set<BSONObj,BSONObjCmp>::iterator i = all.begin() ; i != all.end(); i++ ){ + b.appendAs( i->firstElement() , b.numStr( n++ ).c_str() ); + } + + result.appendArray( "values" , b.obj() ); + return true; + } + } disinctCmd; + + class MRCmd : public PublicGridCommand { + public: + MRCmd() : PublicGridCommand( "mapreduce" ){} + + string getTmpName( const string& coll ){ + static int inc = 1; + stringstream ss; + ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++; + return ss.str(); + } + + BSONObj fixForShards( const BSONObj& orig , const string& output ){ + BSONObjBuilder b; + BSONObjIterator i( orig ); + while ( i.more() ){ + BSONElement e = i.next(); + string fn = e.fieldName(); + if ( fn == "map" || + fn == "mapreduce" || + fn == "reduce" || + fn == "query" || + fn == "sort" || + fn == "verbose" ){ + b.append( e ); + } + else if ( fn == "keeptemp" || + fn == "out" || + fn == "finalize" ){ + // we don't want to copy these + } + else { + uassert( 10177 , (string)"don't know mr field: " + fn , 0 ); + } + } + b.append( "out" , output ); + return b.obj(); + } + + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + Timer t; + + string dbName = getDBName( ns ); + string collection = cmdObj.firstElement().valuestrsafe(); + string fullns = dbName + "." + collection; + + DBConfig * conf = grid.getDBConfig( dbName , false ); + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + return passthrough( conf , cmdObj , result ); + } + + BSONObjBuilder timingBuilder; + + ChunkManager * cm = conf->getChunkManager( fullns ); + + BSONObj q; + if ( cmdObj["query"].type() == Object ){ + q = cmdObj["query"].embeddedObjectUserCheck(); + } + + vector<Chunk*> chunks; + cm->getChunksForQuery( chunks , q ); + + const string shardedOutputCollection = getTmpName( collection ); + + BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection ); + + BSONObjBuilder finalCmd; + finalCmd.append( "mapreduce.shardedfinish" , cmdObj ); + finalCmd.append( "shardedOutputCollection" , shardedOutputCollection ); + + list< shared_ptr<Future::CommandResult> > futures; + + for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ + Chunk * c = *i; + futures.push_back( Future::spawnCommand( c->getShard() , dbName , shardedCommand ) ); + } + + BSONObjBuilder shardresults; + for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ){ + shared_ptr<Future::CommandResult> res = *i; + if ( ! res->join() ){ + errmsg = "mongod mr failed: "; + errmsg += res->result().toString(); + return 0; + } + shardresults.append( res->getServer() , res->result() ); + } + + finalCmd.append( "shards" , shardresults.obj() ); + timingBuilder.append( "shards" , t.millis() ); + + Timer t2; + ScopedDbConnection conn( conf->getPrimary() ); + BSONObj finalResult; + if ( ! conn->runCommand( dbName , finalCmd.obj() , finalResult ) ){ + errmsg = "final reduce failed: "; + errmsg += finalResult.toString(); + return 0; + } + timingBuilder.append( "final" , t2.millis() ); + + result.appendElements( finalResult ); + result.append( "timeMillis" , t.millis() ); + result.append( "timing" , timingBuilder.obj() ); + + return 1; + } + } mrCmd; + } +} diff --git a/s/config.cpp b/s/config.cpp new file mode 100644 index 0000000..0bfb5a3 --- /dev/null +++ b/s/config.cpp @@ -0,0 +1,535 @@ +// config.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "stdafx.h" +#include "../util/message.h" +#include "../util/unittest.h" +#include "../client/connpool.h" +#include "../client/model.h" +#include "../db/pdfile.h" +#include "../db/cmdline.h" + +#include "server.h" +#include "config.h" +#include "chunk.h" + +namespace mongo { + + int ConfigServer::VERSION = 2; + + /* --- DBConfig --- */ + + string DBConfig::modelServer() { + return configServer.modelServer(); + } + + bool DBConfig::isSharded( const string& ns ){ + if ( ! _shardingEnabled ) + return false; + return _sharded.find( ns ) != _sharded.end(); + } + + string DBConfig::getShard( const string& ns ){ + if ( isSharded( ns ) ) + return ""; + + uassert( 10178 , "no primary!" , _primary.size() ); + return _primary; + } + + void DBConfig::enableSharding(){ + _shardingEnabled = true; + } + + ChunkManager* DBConfig::shardCollection( const string& ns , ShardKeyPattern fieldsAndOrder , bool unique ){ + if ( ! _shardingEnabled ) + throw UserException( 8042 , "db doesn't have sharding enabled" ); + + ChunkManager * info = _shards[ns]; + if ( info ) + return info; + + if ( isSharded( ns ) ) + throw UserException( 8043 , "already sharded" ); + + log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder << endl; + _sharded[ns] = CollectionInfo( fieldsAndOrder , unique ); + + info = new ChunkManager( this , ns , fieldsAndOrder , unique ); + _shards[ns] = info; + return info; + + } + + bool DBConfig::removeSharding( const string& ns ){ + if ( ! _shardingEnabled ){ + cout << "AAAA" << endl; + return false; + } + + ChunkManager * info = _shards[ns]; + map<string,CollectionInfo>::iterator i = _sharded.find( ns ); + + if ( info == 0 && i == _sharded.end() ){ + cout << "BBBB" << endl; + return false; + } + uassert( 10179 , "_sharded but no info" , info ); + uassert( 10180 , "info but no sharded" , i != _sharded.end() ); + + _sharded.erase( i ); + _shards.erase( ns ); // TODO: clean this up, maybe switch to shared_ptr + return true; + } + + ChunkManager* DBConfig::getChunkManager( const string& ns , bool reload ){ + ChunkManager* m = _shards[ns]; + if ( m && ! reload ) + return m; + + uassert( 10181 , (string)"not sharded:" + ns , isSharded( ns ) ); + if ( m && reload ) + log() << "reloading shard info for: " << ns << endl; + m = new ChunkManager( this , ns , _sharded[ ns ].key , _sharded[ns].unique ); + _shards[ns] = m; + return m; + } + + void DBConfig::serialize(BSONObjBuilder& to){ + to.append("name", _name); + to.appendBool("partitioned", _shardingEnabled ); + to.append("primary", _primary ); + + if ( _sharded.size() > 0 ){ + BSONObjBuilder a; + for ( map<string,CollectionInfo>::reverse_iterator i=_sharded.rbegin(); i != _sharded.rend(); i++){ + BSONObjBuilder temp; + temp.append( "key" , i->second.key.key() ); + temp.appendBool( "unique" , i->second.unique ); + a.append( i->first.c_str() , temp.obj() ); + } + to.append( "sharded" , a.obj() ); + } + } + + void DBConfig::unserialize(const BSONObj& from){ + _name = from.getStringField("name"); + _shardingEnabled = from.getBoolField("partitioned"); + _primary = from.getStringField("primary"); + + _sharded.clear(); + BSONObj sharded = from.getObjectField( "sharded" ); + if ( ! sharded.isEmpty() ){ + BSONObjIterator i(sharded); + while ( i.more() ){ + BSONElement e = i.next(); + uassert( 10182 , "sharded things have to be objects" , e.type() == Object ); + BSONObj c = e.embeddedObject(); + uassert( 10183 , "key has to be an object" , c["key"].type() == Object ); + _sharded[e.fieldName()] = CollectionInfo( c["key"].embeddedObject() , + c["unique"].trueValue() ); + } + } + } + + void DBConfig::save( bool check ){ + Model::save( check ); + for ( map<string,ChunkManager*>::iterator i=_shards.begin(); i != _shards.end(); i++) + i->second->save(); + } + + bool DBConfig::reload(){ + // TODO: i don't think is 100% correct + return doload(); + } + + bool DBConfig::doload(){ + BSONObjBuilder b; + b.append("name", _name.c_str()); + BSONObj q = b.done(); + return load(q); + } + + bool DBConfig::dropDatabase( string& errmsg ){ + /** + * 1) make sure everything is up + * 2) update config server + * 3) drop and reset sharded collections + * 4) drop and reset primary + * 5) drop everywhere to clean up loose ends + */ + + log() << "DBConfig::dropDatabase: " << _name << endl; + + // 1 + if ( ! configServer.allUp( errmsg ) ){ + log(1) << "\t DBConfig::dropDatabase not all up" << endl; + return 0; + } + + // 2 + grid.removeDB( _name ); + remove( true ); + if ( ! configServer.allUp( errmsg ) ){ + log() << "error removing from config server even after checking!" << endl; + return 0; + } + log(1) << "\t removed entry from config server for: " << _name << endl; + + set<string> allServers; + + // 3 + while ( true ){ + int num; + if ( ! _dropShardedCollections( num , allServers , errmsg ) ) + return 0; + log() << " DBConfig::dropDatabase: " << _name << " dropped sharded collections: " << num << endl; + if ( num == 0 ) + break; + } + + // 4 + { + ScopedDbConnection conn( _primary ); + BSONObj res; + if ( ! conn->dropDatabase( _name , &res ) ){ + errmsg = res.toString(); + return 0; + } + conn.done(); + } + + // 5 + for ( set<string>::iterator i=allServers.begin(); i!=allServers.end(); i++ ){ + string s = *i; + ScopedDbConnection conn( s ); + BSONObj res; + if ( ! conn->dropDatabase( _name , &res ) ){ + errmsg = res.toString(); + return 0; + } + conn.done(); + } + + log(1) << "\t dropped primary db for: " << _name << endl; + + return true; + } + + bool DBConfig::_dropShardedCollections( int& num, set<string>& allServers , string& errmsg ){ + num = 0; + set<string> seen; + while ( true ){ + map<string,ChunkManager*>::iterator i = _shards.begin(); + + if ( i == _shards.end() ) + break; + + if ( seen.count( i->first ) ){ + errmsg = "seen a collection twice!"; + return false; + } + + seen.insert( i->first ); + log(1) << "\t dropping sharded collection: " << i->first << endl; + + i->second->getAllServers( allServers ); + i->second->drop(); + + num++; + uassert( 10184 , "_dropShardedCollections too many collections - bailing" , num < 100000 ); + log(2) << "\t\t dropped " << num << " so far" << endl; + } + return true; + } + + /* --- Grid --- */ + + string Grid::pickShardForNewDB(){ + ScopedDbConnection conn( configServer.getPrimary() ); + + // TODO: this is temporary + + vector<string> all; + auto_ptr<DBClientCursor> c = conn->query( "config.shards" , Query() ); + while ( c->more() ){ + BSONObj s = c->next(); + all.push_back( s["host"].valuestrsafe() ); + // look at s["maxSize"] if exists + } + conn.done(); + + if ( all.size() == 0 ) + return ""; + + return all[ rand() % all.size() ]; + } + + bool Grid::knowAboutShard( string name ) const{ + ScopedDbConnection conn( configServer.getPrimary() ); + BSONObj shard = conn->findOne( "config.shards" , BSON( "host" << name ) ); + conn.done(); + return ! shard.isEmpty(); + } + + DBConfig* Grid::getDBConfig( string database , bool create ){ + { + string::size_type i = database.find( "." ); + if ( i != string::npos ) + database = database.substr( 0 , i ); + } + + if ( database == "config" ) + return &configServer; + + boostlock l( _lock ); + + DBConfig*& cc = _databases[database]; + if ( cc == 0 ){ + cc = new DBConfig( database ); + if ( ! cc->doload() ){ + if ( create ){ + // note here that cc->primary == 0. + log() << "couldn't find database [" << database << "] in config db" << endl; + + if ( database == "admin" ) + cc->_primary = configServer.getPrimary(); + else + cc->_primary = pickShardForNewDB(); + + if ( cc->_primary.size() ){ + cc->save(); + log() << "\t put [" << database << "] on: " << cc->_primary << endl; + } + else { + log() << "\t can't find a shard to put new db on" << endl; + uassert( 10185 , "can't find a shard to put new db on" , 0 ); + } + } + else { + cc = 0; + } + } + + } + + return cc; + } + + void Grid::removeDB( string database ){ + uassert( 10186 , "removeDB expects db name" , database.find( '.' ) == string::npos ); + boostlock l( _lock ); + _databases.erase( database ); + + } + + unsigned long long Grid::getNextOpTime() const { + ScopedDbConnection conn( configServer.getPrimary() ); + + BSONObj result; + massert( 10421 , "getoptime failed" , conn->simpleCommand( "admin" , &result , "getoptime" ) ); + conn.done(); + + return result["optime"]._numberLong(); + } + + /* --- ConfigServer ---- */ + + ConfigServer::ConfigServer() { + _shardingEnabled = false; + _primary = ""; + _name = "grid"; + } + + ConfigServer::~ConfigServer() { + } + + bool ConfigServer::init( vector<string> configHosts ){ + uassert( 10187 , "need configdbs" , configHosts.size() ); + + string hn = getHostName(); + if ( hn.empty() ) { + sleepsecs(5); + dbexit( EXIT_BADOPTIONS ); + } + ourHostname = hn; + + set<string> hosts; + for ( size_t i=0; i<configHosts.size(); i++ ){ + string host = configHosts[i]; + hosts.insert( getHost( host , false ) ); + configHosts[i] = getHost( host , true ); + } + + for ( set<string>::iterator i=hosts.begin(); i!=hosts.end(); i++ ){ + string host = *i; + bool ok = false; + for ( int x=0; x<10; x++ ){ + if ( ! hostbyname( host.c_str() ).empty() ){ + ok = true; + break; + } + log() << "can't resolve DNS for [" << host << "] sleeping and trying " << (10-x) << " more times" << endl; + sleepsecs( 10 ); + } + if ( ! ok ) + return false; + } + + uassert( 10188 , "can only hand 1 config db right now" , configHosts.size() == 1 ); + _primary = configHosts[0]; + + return true; + } + + bool ConfigServer::allUp(){ + string errmsg; + return allUp( errmsg ); + } + + bool ConfigServer::allUp( string& errmsg ){ + try { + ScopedDbConnection conn( _primary ); + conn->getLastError(); + conn.done(); + return true; + } + catch ( DBException& ){ + log() << "ConfigServer::allUp : " << _primary << " seems down!" << endl; + errmsg = _primary + " seems down"; + return false; + } + + } + + int ConfigServer::dbConfigVersion(){ + ScopedDbConnection conn( _primary ); + int version = dbConfigVersion( conn.conn() ); + conn.done(); + return version; + } + + int ConfigServer::dbConfigVersion( DBClientBase& conn ){ + auto_ptr<DBClientCursor> c = conn.query( "config.version" , BSONObj() ); + int version = 0; + if ( c->more() ){ + BSONObj o = c->next(); + version = o["version"].numberInt(); + uassert( 10189 , "should only have 1 thing in config.version" , ! c->more() ); + } + else { + if ( conn.count( "config.shard" ) || conn.count( "config.databases" ) ){ + version = 1; + } + } + + return version; + } + + int ConfigServer::checkConfigVersion(){ + int cur = dbConfigVersion(); + if ( cur == VERSION ) + return 0; + + if ( cur == 0 ){ + ScopedDbConnection conn( _primary ); + conn->insert( "config.version" , BSON( "version" << VERSION ) ); + pool.flush(); + assert( VERSION == dbConfigVersion( conn.conn() ) ); + conn.done(); + return 0; + } + + log() << "don't know how to upgrade " << cur << " to " << VERSION << endl; + return -8; + } + + string ConfigServer::getHost( string name , bool withPort ){ + if ( name.find( ":" ) ){ + if ( withPort ) + return name; + return name.substr( 0 , name.find( ":" ) ); + } + + if ( withPort ){ + stringstream ss; + ss << name << ":" << CmdLine::ConfigServerPort; + return ss.str(); + } + + return name; + } + + ConfigServer configServer; + Grid grid; + + + class DBConfigUnitTest : public UnitTest { + public: + void testInOut( DBConfig& c , BSONObj o ){ + c.unserialize( o ); + BSONObjBuilder b; + c.serialize( b ); + + BSONObj out = b.obj(); + + if ( o.toString() == out.toString() ) + return; + + log() << "DBConfig serialization broken\n" + << "in : " << o.toString() << "\n" + << "out : " << out.toString() + << endl; + assert(0); + } + + void a(){ + BSONObjBuilder b; + b << "name" << "abc"; + b.appendBool( "partitioned" , true ); + b << "primary" << "myserver"; + + DBConfig c; + testInOut( c , b.obj() ); + } + + void b(){ + BSONObjBuilder b; + b << "name" << "abc"; + b.appendBool( "partitioned" , true ); + b << "primary" << "myserver"; + + BSONObjBuilder a; + a << "abc.foo" << fromjson( "{ 'key' : { 'a' : 1 } , 'unique' : false }" ); + a << "abc.bar" << fromjson( "{ 'key' : { 'kb' : -1 } , 'unique' : true }" ); + + b.appendArray( "sharded" , a.obj() ); + + DBConfig c; + testInOut( c , b.obj() ); + assert( c.isSharded( "abc.foo" ) ); + assert( ! c.isSharded( "abc.food" ) ); + } + + void run(){ + a(); + b(); + } + + } dbConfigUnitTest; +} diff --git a/s/config.h b/s/config.h new file mode 100644 index 0000000..16aa67a --- /dev/null +++ b/s/config.h @@ -0,0 +1,195 @@ +// config.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +/* This file is things related to the "grid configuration": + - what machines make up the db component of our cloud + - where various ranges of things live +*/ + +#pragma once + +#include "../db/namespace.h" +#include "../client/dbclient.h" +#include "../client/model.h" +#include "shardkey.h" + +namespace mongo { + + class Grid; + class ConfigServer; + + extern ConfigServer configServer; + extern Grid grid; + + class ChunkManager; + + class CollectionInfo { + public: + CollectionInfo( ShardKeyPattern _key = BSONObj() , bool _unique = false ) : + key( _key ) , unique( _unique ){} + + ShardKeyPattern key; + bool unique; + }; + + /** + * top level grid configuration for an entire database + * TODO: use shared_ptr for ChunkManager + */ + class DBConfig : public Model { + public: + DBConfig( string name = "" ) : _name( name ) , _primary("") , _shardingEnabled(false){ } + + string getName(){ return _name; }; + + /** + * @return if anything in this db is partitioned or not + */ + bool isShardingEnabled(){ + return _shardingEnabled; + } + + void enableSharding(); + ChunkManager* shardCollection( const string& ns , ShardKeyPattern fieldsAndOrder , bool unique ); + + /** + * @return whether or not this partition is partitioned + */ + bool isSharded( const string& ns ); + + ChunkManager* getChunkManager( const string& ns , bool reload = false ); + + /** + * @return the correct for shard for the ns + * if the namespace is sharded, will return an empty string + */ + string getShard( const string& ns ); + + string getPrimary(){ + if ( _primary.size() == 0 ) + throw UserException( 8041 , (string)"no primary shard configured for db: " + _name ); + return _primary; + } + + void setPrimary( string s ){ + _primary = s; + } + + bool reload(); + + bool dropDatabase( string& errmsg ); + + virtual void save( bool check=true); + + virtual string modelServer(); + + // model stuff + + virtual const char * getNS(){ return "config.databases"; } + virtual void serialize(BSONObjBuilder& to); + virtual void unserialize(const BSONObj& from); + + protected: + + bool _dropShardedCollections( int& num, set<string>& allServers , string& errmsg ); + + bool doload(); + + /** + @return true if there was sharding info to remove + */ + bool removeSharding( const string& ns ); + + string _name; // e.g. "alleyinsider" + string _primary; // e.g. localhost , mongo.foo.com:9999 + bool _shardingEnabled; + + map<string,CollectionInfo> _sharded; // { "alleyinsider.blog.posts" : { ts : 1 } , ... ] - all ns that are sharded + map<string,ChunkManager*> _shards; // this will only have entries for things that have been looked at + + friend class Grid; + friend class ChunkManager; + }; + + /** + * stores meta-information about the grid + * TODO: used shard_ptr for DBConfig pointers + */ + class Grid { + public: + /** + gets the config the db. + will return an empty DBConfig if not in db already + */ + DBConfig * getDBConfig( string ns , bool create=true); + + /** + * removes db entry. + * on next getDBConfig call will fetch from db + */ + void removeDB( string db ); + + string pickShardForNewDB(); + + bool knowAboutShard( string name ) const; + + unsigned long long getNextOpTime() const; + private: + map<string,DBConfig*> _databases; + boost::mutex _lock; // TODO: change to r/w lock + }; + + class ConfigServer : public DBConfig { + public: + + ConfigServer(); + ~ConfigServer(); + + bool ok(){ + // TODO: check can connect + return _primary.size() > 0; + } + + virtual string modelServer(){ + uassert( 10190 , "ConfigServer not setup" , _primary.size() ); + return _primary; + } + + /** + call at startup, this will initiate connection to the grid db + */ + bool init( vector<string> configHosts ); + + bool allUp(); + bool allUp( string& errmsg ); + + int dbConfigVersion(); + int dbConfigVersion( DBClientBase& conn ); + + /** + * @return 0 = ok, otherwise error # + */ + int checkConfigVersion(); + + static int VERSION; + + private: + string getHost( string name , bool withPort ); + }; + +} // namespace mongo diff --git a/s/cursors.cpp b/s/cursors.cpp new file mode 100644 index 0000000..23b8eaf --- /dev/null +++ b/s/cursors.cpp @@ -0,0 +1,104 @@ +// cursors.cpp + +#include "stdafx.h" +#include "cursors.h" +#include "../client/connpool.h" +#include "../db/queryutil.h" + +namespace mongo { + + // -------- ShardedCursor ----------- + + ShardedClientCursor::ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ){ + assert( cursor ); + _cursor = cursor; + + _skip = q.ntoskip; + _ntoreturn = q.ntoreturn; + + _totalSent = 0; + _done = false; + + do { + // TODO: only create _id when needed + _id = security.getNonce(); + } while ( _id == 0 ); + + } + + ShardedClientCursor::~ShardedClientCursor(){ + assert( _cursor ); + delete _cursor; + _cursor = 0; + } + + bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ){ + uassert( 10191 , "cursor already done" , ! _done ); + + int maxSize = 1024 * 1024; + if ( _totalSent > 0 ) + maxSize *= 3; + + BufBuilder b(32768); + + int num = 0; + bool sendMore = true; + + while ( _cursor->more() ){ + BSONObj o = _cursor->next(); + + b.append( (void*)o.objdata() , o.objsize() ); + num++; + + if ( b.len() > maxSize ){ + break; + } + + if ( num == ntoreturn ){ + // soft limit aka batch size + break; + } + + if ( ntoreturn != 0 && ( -1 * num + _totalSent ) == ntoreturn ){ + // hard limit - total to send + sendMore = false; + break; + } + } + + bool hasMore = sendMore && _cursor->more(); + log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << _id << " totalSent: " << _totalSent << endl; + + replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? _id : 0 ); + _totalSent += num; + _done = ! hasMore; + + return hasMore; + } + + + CursorCache::CursorCache(){ + } + + CursorCache::~CursorCache(){ + // TODO: delete old cursors? + } + + ShardedClientCursor* CursorCache::get( long long id ){ + map<long long,ShardedClientCursor*>::iterator i = _cursors.find( id ); + if ( i == _cursors.end() ){ + OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl; + return 0; + } + return i->second; + } + + void CursorCache::store( ShardedClientCursor * cursor ){ + _cursors[cursor->getId()] = cursor; + } + void CursorCache::remove( long long id ){ + _cursors.erase( id ); + } + + CursorCache cursorCache; +} diff --git a/s/cursors.h b/s/cursors.h new file mode 100644 index 0000000..b1ed4b0 --- /dev/null +++ b/s/cursors.h @@ -0,0 +1,56 @@ +// cursors.h + +#pragma once + +#include "../stdafx.h" + +#include "../db/jsobj.h" +#include "../db/dbmessage.h" +#include "../client/dbclient.h" +#include "../client/parallel.h" + +#include "request.h" + +namespace mongo { + + class ShardedClientCursor { + public: + ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ); + virtual ~ShardedClientCursor(); + + long long getId(){ return _id; } + + /** + * @return whether there is more data left + */ + bool sendNextBatch( Request& r ){ return sendNextBatch( r , _ntoreturn ); } + bool sendNextBatch( Request& r , int ntoreturn ); + + protected: + + ClusteredCursor * _cursor; + + int _skip; + int _ntoreturn; + + int _totalSent; + bool _done; + + long long _id; + }; + + class CursorCache { + public: + CursorCache(); + ~CursorCache(); + + ShardedClientCursor * get( long long id ); + void store( ShardedClientCursor* cursor ); + void remove( long long id ); + + private: + map<long long,ShardedClientCursor*> _cursors; + }; + + extern CursorCache cursorCache; +} diff --git a/s/d_logic.cpp b/s/d_logic.cpp new file mode 100644 index 0000000..cc627eb --- /dev/null +++ b/s/d_logic.cpp @@ -0,0 +1,542 @@ +// d_logic.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + + +/** + these are commands that live in mongod + mostly around shard management and checking + */ + +#include "stdafx.h" +#include <map> +#include <string> + +#include "../db/commands.h" +#include "../db/jsobj.h" +#include "../db/dbmessage.h" + +#include "../client/connpool.h" + +#include "../util/queue.h" + +using namespace std; + +namespace mongo { + + typedef map<string,unsigned long long> NSVersions; + + NSVersions globalVersions; + boost::thread_specific_ptr<NSVersions> clientShardVersions; + + string shardConfigServer; + + boost::thread_specific_ptr<OID> clientServerIds; + map< string , BlockingQueue<BSONObj>* > clientQueues; + + unsigned long long getVersion( BSONElement e , string& errmsg ){ + if ( e.eoo() ){ + errmsg = "no version"; + return 0; + } + + if ( e.isNumber() ) + return (unsigned long long)e.number(); + + if ( e.type() == Date || e.type() == Timestamp ) + return e._numberLong(); + + + errmsg = "version is not a numberic type"; + return 0; + } + + class MongodShardCommand : public Command { + public: + MongodShardCommand( const char * n ) : Command( n ){ + } + virtual bool slaveOk(){ + return false; + } + virtual bool adminOnly() { + return true; + } + }; + + class WriteBackCommand : public MongodShardCommand { + public: + WriteBackCommand() : MongodShardCommand( "writebacklisten" ){} + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + BSONElement e = cmdObj.firstElement(); + if ( e.type() != jstOID ){ + errmsg = "need oid as first value"; + return 0; + } + + const OID id = e.__oid(); + + dbtemprelease unlock; + + if ( ! clientQueues[id.str()] ) + clientQueues[id.str()] = new BlockingQueue<BSONObj>(); + + BSONObj z = clientQueues[id.str()]->blockingPop(); + log(1) << "WriteBackCommand got : " << z << endl; + + result.append( "data" , z ); + + return true; + } + } writeBackCommand; + + // setShardVersion( ns ) + + class SetShardVersion : public MongodShardCommand { + public: + SetShardVersion() : MongodShardCommand("setShardVersion"){} + + virtual void help( stringstream& help ) const { + help << " example: { setShardVersion : 'alleyinsider.foo' , version : 1 , configdb : '' } "; + } + + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + bool authoritative = cmdObj.getBoolField( "authoritative" ); + + string configdb = cmdObj["configdb"].valuestrsafe(); + { // configdb checking + if ( configdb.size() == 0 ){ + errmsg = "no configdb"; + return false; + } + + if ( shardConfigServer.size() == 0 ){ + if ( ! authoritative ){ + result.appendBool( "need_authoritative" , true ); + errmsg = "first setShardVersion"; + return false; + } + shardConfigServer = configdb; + } + else if ( shardConfigServer != configdb ){ + errmsg = "specified a different configdb!"; + return false; + } + } + + { // setting up ids + if ( cmdObj["serverID"].type() != jstOID ){ + // TODO: fix this + //errmsg = "need serverID to be an OID"; + //return 0; + } + else { + OID clientId = cmdObj["serverID"].__oid(); + if ( ! clientServerIds.get() ){ + string s = clientId.str(); + + OID * nid = new OID(); + nid->init( s ); + clientServerIds.reset( nid ); + + if ( ! clientQueues[s] ) + clientQueues[s] = new BlockingQueue<BSONObj>(); + } + else if ( clientId != *clientServerIds.get() ){ + errmsg = "server id has changed!"; + return 0; + } + } + } + + unsigned long long version = getVersion( cmdObj["version"] , errmsg ); + if ( errmsg.size() ){ + return false; + } + + NSVersions * versions = clientShardVersions.get(); + + if ( ! versions ){ + log(1) << "entering shard mode for connection" << endl; + versions = new NSVersions(); + clientShardVersions.reset( versions ); + } + + string ns = cmdObj["setShardVersion"].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "need to speciy fully namespace"; + return false; + } + + unsigned long long& oldVersion = (*versions)[ns]; + unsigned long long& globalVersion = globalVersions[ns]; + + if ( version == 0 && globalVersion == 0 ){ + // this connection is cleaning itself + oldVersion = 0; + return 1; + } + + if ( version == 0 && globalVersion > 0 ){ + if ( ! authoritative ){ + result.appendBool( "need_authoritative" , true ); + result.appendTimestamp( "globalVersion" , globalVersion ); + result.appendTimestamp( "oldVersion" , oldVersion ); + errmsg = "dropping needs to be authoritative"; + return 0; + } + log() << "wiping data for: " << ns << endl; + result.appendTimestamp( "beforeDrop" , globalVersion ); + // only setting global version on purpose + // need clients to re-find meta-data + globalVersion = 0; + oldVersion = 0; + return 1; + } + + if ( version < oldVersion ){ + errmsg = "you already have a newer version"; + result.appendTimestamp( "oldVersion" , oldVersion ); + result.appendTimestamp( "newVersion" , version ); + return false; + } + + if ( version < globalVersion ){ + errmsg = "going to older version for global"; + return false; + } + + if ( globalVersion == 0 && ! cmdObj.getBoolField( "authoritative" ) ){ + // need authoritative for first look + result.appendBool( "need_authoritative" , true ); + result.append( "ns" , ns ); + errmsg = "first time for this ns"; + return false; + } + + result.appendTimestamp( "oldVersion" , oldVersion ); + oldVersion = version; + globalVersion = version; + + result.append( "ok" , 1 ); + return 1; + } + + } setShardVersion; + + class GetShardVersion : public MongodShardCommand { + public: + GetShardVersion() : MongodShardCommand("getShardVersion"){} + + virtual void help( stringstream& help ) const { + help << " example: { getShardVersion : 'alleyinsider.foo' } "; + } + + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string ns = cmdObj["getShardVersion"].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "need to speciy fully namespace"; + return false; + } + + result.append( "configServer" , shardConfigServer.c_str() ); + + result.appendTimestamp( "global" , globalVersions[ns] ); + if ( clientShardVersions.get() ) + result.appendTimestamp( "mine" , (*clientShardVersions.get())[ns] ); + else + result.appendTimestamp( "mine" , 0 ); + + return true; + } + + } getShardVersion; + + class MoveShardStartCommand : public MongodShardCommand { + public: + MoveShardStartCommand() : MongodShardCommand( "movechunk.start" ){} + virtual void help( stringstream& help ) const { + help << "should not be calling this directly" << endl; + } + + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + // so i have to start clone, tell caller its ok to make change + // at this point the caller locks me, and updates config db + // then finish calls finish, and then deletes data when cursors are done + + string ns = cmdObj["movechunk.start"].valuestrsafe(); + string to = cmdObj["to"].valuestrsafe(); + string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe + BSONObj filter = cmdObj.getObjectField( "filter" ); + + if ( ns.size() == 0 ){ + errmsg = "need to specify namespace in command"; + return false; + } + + if ( to.size() == 0 ){ + errmsg = "need to specify server to move shard to"; + return false; + } + if ( from.size() == 0 ){ + errmsg = "need to specify server to move shard from (redundat i know)"; + return false; + } + + if ( filter.isEmpty() ){ + errmsg = "need to specify a filter"; + return false; + } + + log() << "got movechunk.start: " << cmdObj << endl; + + + BSONObj res; + bool ok; + + { + dbtemprelease unlock; + + ScopedDbConnection conn( to ); + ok = conn->runCommand( "admin" , + BSON( "startCloneCollection" << ns << + "from" << from << + "query" << filter + ) , + res ); + conn.done(); + } + + log() << " movechunk.start res: " << res << endl; + + if ( ok ){ + result.append( res["finishToken"] ); + } + else { + errmsg = "startCloneCollection failed: "; + errmsg += res["errmsg"].valuestrsafe(); + } + return ok; + } + + } moveShardStartCmd; + + class MoveShardFinishCommand : public MongodShardCommand { + public: + MoveShardFinishCommand() : MongodShardCommand( "movechunk.finish" ){} + virtual void help( stringstream& help ) const { + help << "should not be calling this directly" << endl; + } + + bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + // see MoveShardStartCommand::run + + string ns = cmdObj["movechunk.finish"].valuestrsafe(); + if ( ns.size() == 0 ){ + errmsg = "need ns as cmd value"; + return false; + } + + string to = cmdObj["to"].valuestrsafe(); + if ( to.size() == 0 ){ + errmsg = "need to specify server to move shard to"; + return false; + } + + + unsigned long long newVersion = getVersion( cmdObj["newVersion"] , errmsg ); + if ( newVersion == 0 ){ + errmsg = "have to specify new version number"; + return false; + } + + BSONObj finishToken = cmdObj.getObjectField( "finishToken" ); + if ( finishToken.isEmpty() ){ + errmsg = "need finishToken"; + return false; + } + + if ( ns != finishToken["collection"].valuestrsafe() ){ + errmsg = "namespaced don't match"; + return false; + } + + // now we're locked + globalVersions[ns] = newVersion; + NSVersions * versions = clientShardVersions.get(); + if ( ! versions ){ + versions = new NSVersions(); + clientShardVersions.reset( versions ); + } + (*versions)[ns] = newVersion; + + BSONObj res; + bool ok; + + { + dbtemprelease unlock; + + ScopedDbConnection conn( to ); + ok = conn->runCommand( "admin" , + BSON( "finishCloneCollection" << finishToken ) , + res ); + conn.done(); + } + + if ( ! ok ){ + // uh oh + errmsg = "finishCloneCollection failed!"; + result << "finishError" << res; + return false; + } + + // wait until cursors are clean + cout << "WARNING: deleting data before ensuring no more cursors TODO" << endl; + + dbtemprelease unlock; + + DBDirectClient client; + BSONObj removeFilter = finishToken.getObjectField( "query" ); + client.remove( ns , removeFilter ); + + return true; + } + + } moveShardFinishCmd; + + bool haveLocalShardingInfo( const string& ns ){ + if ( shardConfigServer.empty() ) + return false; + + + unsigned long long version = globalVersions[ns]; + if ( version == 0 ) + return false; + + NSVersions * versions = clientShardVersions.get(); + if ( ! versions ) + return false; + + return true; + } + + /** + * @ return true if not in sharded mode + or if version for this client is ok + */ + bool shardVersionOk( const string& ns , string& errmsg ){ + if ( shardConfigServer.empty() ){ + return true; + } + + NSVersions::iterator i = globalVersions.find( ns ); + if ( i == globalVersions.end() ) + return true; + + NSVersions * versions = clientShardVersions.get(); + if ( ! versions ){ + // this means the client has nothing sharded + // so this allows direct connections to do whatever they want + // which i think is the correct behavior + return true; + } + + unsigned long long clientVersion = (*versions)[ns]; + unsigned long long version = i->second; + + if ( version == 0 && clientVersion > 0 ){ + stringstream ss; + ss << "version: " << version << " clientVersion: " << clientVersion; + errmsg = ss.str(); + return false; + } + + if ( clientVersion >= version ) + return true; + + + if ( clientVersion == 0 ){ + errmsg = "client in sharded mode, but doesn't have version set for this collection"; + return false; + } + + errmsg = (string)"your version is too old ns: " + ns; + return false; + } + + + bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ){ + + if ( shardConfigServer.empty() ){ + return false; + } + + int op = m.data->operation(); + if ( op < 2000 || op >= 3000 ) + return false; + + + const char *ns = m.data->_data + 4; + string errmsg; + if ( shardVersionOk( ns , errmsg ) ){ + return false; + } + + log() << "shardVersionOk failed ns:" << ns << " " << errmsg << endl; + + if ( doesOpGetAResponse( op ) ){ + BufBuilder b( 32768 ); + b.skip( sizeof( QueryResult ) ); + { + BSONObj obj = BSON( "$err" << errmsg ); + b.append( obj.objdata() , obj.objsize() ); + } + + QueryResult *qr = (QueryResult*)b.buf(); + qr->_resultFlags() = QueryResult::ResultFlag_ErrSet | QueryResult::ResultFlag_ShardConfigStale; + qr->len = b.len(); + qr->setOperation( opReply ); + qr->cursorId = 0; + qr->startingFrom = 0; + qr->nReturned = 1; + b.decouple(); + + Message * resp = new Message(); + resp->setData( qr , true ); + + dbresponse.response = resp; + dbresponse.responseTo = m.data->id; + return true; + } + + OID * clientID = clientServerIds.get(); + massert( 10422 , "write with bad shard config and no server id!" , clientID ); + + log() << "got write with an old config - writing back" << endl; + + BSONObjBuilder b; + b.appendBool( "writeBack" , true ); + b.append( "ns" , ns ); + b.appendBinData( "msg" , m.data->len , bdtCustom , (char*)(m.data) ); + log() << "writing back msg with len: " << m.data->len << " op: " << m.data->_operation << endl; + clientQueues[clientID->str()]->push( b.obj() ); + + return true; + } + +} diff --git a/s/d_logic.h b/s/d_logic.h new file mode 100644 index 0000000..3e483c4 --- /dev/null +++ b/s/d_logic.h @@ -0,0 +1,23 @@ +// d_logic.h + +#pragma once + +#include "../stdafx.h" + +namespace mongo { + + /** + * @return true if we have any shard info for the ns + */ + bool haveLocalShardingInfo( const string& ns ); + + /** + * @return true if the current threads shard version is ok, or not in sharded version + */ + bool shardVersionOk( const string& ns , string& errmsg ); + + /** + * @return true if we took care of the message and nothing else should be done + */ + bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ); +} diff --git a/s/d_util.cpp b/s/d_util.cpp new file mode 100644 index 0000000..8c30d2e --- /dev/null +++ b/s/d_util.cpp @@ -0,0 +1,36 @@ +// util.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + + +/** + these are commands that live in mongod + mostly around shard management and checking + */ + +#include "stdafx.h" +#include "util.h" + +using namespace std; + +namespace mongo { + + void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative ){ + // no-op in mongod + } + +} diff --git a/s/dbgrid.vcproj b/s/dbgrid.vcproj new file mode 100644 index 0000000..2c8ef85 --- /dev/null +++ b/s/dbgrid.vcproj @@ -0,0 +1,660 @@ +<?xml version="1.0" encoding="Windows-1252"?>
+<VisualStudioProject
+ ProjectType="Visual C++"
+ Version="9.00"
+ Name="mongos"
+ ProjectGUID="{E03717ED-69B4-4D21-BC55-DF6690B585C6}"
+ RootNamespace="dbgrid"
+ Keyword="Win32Proj"
+ TargetFrameworkVersion="196613"
+ >
+ <Platforms>
+ <Platform
+ Name="Win32"
+ />
+ </Platforms>
+ <ToolFiles>
+ </ToolFiles>
+ <Configurations>
+ <Configuration
+ Name="Debug|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="1"
+ CharacterSet="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="0"
+ AdditionalIncludeDirectories=""..\pcre-7.4";"C:\Program Files\boost\boost_1_35_0""
+ PreprocessorDefinitions="USE_ASIO;WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H;PCRE_STATIC"
+ MinimalRebuild="true"
+ BasicRuntimeChecks="3"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="2"
+ PrecompiledHeaderThrough="stdafx.h"
+ WarningLevel="3"
+ DebugInformationFormat="4"
+ DisableSpecificWarnings="4355;4800"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="ws2_32.lib"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories=""c:\program files\boost\boost_1_35_0\lib""
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Release|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="1"
+ CharacterSet="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="2"
+ EnableIntrinsicFunctions="true"
+ AdditionalIncludeDirectories=""..\pcre-7.4";"c:\Program Files\boost\boost_1_35_0\""
+ PreprocessorDefinitions="USE_ASIO;WIN32;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;PCRE_STATIC"
+ RuntimeLibrary="2"
+ EnableFunctionLevelLinking="true"
+ UsePrecompiledHeader="2"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ DisableSpecificWarnings="4355;4800"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="ws2_32.lib"
+ LinkIncremental="1"
+ AdditionalLibraryDirectories=""c:\Program Files\boost\boost_1_35_0\lib""
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ OptimizeReferences="2"
+ EnableCOMDATFolding="2"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="release_nojni|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="1"
+ CharacterSet="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="2"
+ EnableIntrinsicFunctions="true"
+ AdditionalIncludeDirectories=""..\pcre-7.4";"c:\Program Files\boost\boost_1_35_0\""
+ PreprocessorDefinitions="WIN32;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;PCRE_STATIC"
+ RuntimeLibrary="2"
+ EnableFunctionLevelLinking="true"
+ UsePrecompiledHeader="2"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ DisableSpecificWarnings="4355"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="ws2_32.lib"
+ LinkIncremental="1"
+ AdditionalLibraryDirectories=""c:\Program Files\boost\boost_1_35_0\lib""
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ OptimizeReferences="2"
+ EnableCOMDATFolding="2"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Debug Recstore|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="1"
+ CharacterSet="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="0"
+ AdditionalIncludeDirectories=""..\pcre-7.4";"C:\Program Files\boost\boost_1_35_0""
+ PreprocessorDefinitions="USE_ASIO;WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H;PCRE_STATIC"
+ MinimalRebuild="true"
+ BasicRuntimeChecks="3"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="2"
+ PrecompiledHeaderThrough="stdafx.h"
+ WarningLevel="3"
+ DebugInformationFormat="4"
+ DisableSpecificWarnings="4355;4800"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="ws2_32.lib"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories=""c:\program files\boost\boost_1_35_0\lib""
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ </Configurations>
+ <References>
+ </References>
+ <Files>
+ <Filter
+ Name="Source Files"
+ Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
+ UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
+ >
+ <File
+ RelativePath=".\chunk.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\commands_admin.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\commands_public.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\config.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\cursors.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\queryutil.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\request.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\server.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\shardkey.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\stdafx.cpp"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="1"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="1"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="release_nojni|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="1"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Debug Recstore|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="1"
+ />
+ </FileConfiguration>
+ </File>
+ <File
+ RelativePath=".\strategy.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\strategy_shard.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\strategy_single.cpp"
+ >
+ </File>
+ <Filter
+ Name="Shared Source Files"
+ >
+ <File
+ RelativePath="..\util\assert_util.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\background.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\base64.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\commands.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\debug_util.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\jsobj.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\json.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\lasterror.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\md5.c"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Debug Recstore|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ </File>
+ <File
+ RelativePath="..\util\md5main.cpp"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="2"
+ />
+ </FileConfiguration>
+ </File>
+ <File
+ RelativePath="..\util\message.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\message_server_asio.cpp"
+ >
+ <FileConfiguration
+ Name="Debug|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Release|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="release_nojni|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ <FileConfiguration
+ Name="Debug Recstore|Win32"
+ >
+ <Tool
+ Name="VCCLCompilerTool"
+ UsePrecompiledHeader="0"
+ />
+ </FileConfiguration>
+ </File>
+ <File
+ RelativePath="..\db\nonce.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\client\parallel.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\sock.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\thread_pool.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\util\util.cpp"
+ >
+ </File>
+ </Filter>
+ </Filter>
+ <Filter
+ Name="Header Files"
+ Filter="h;hpp;hxx;hm;inl;inc;xsd"
+ UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"
+ >
+ <File
+ RelativePath=".\gridconfig.h"
+ >
+ </File>
+ <File
+ RelativePath=".\griddatabase.h"
+ >
+ </File>
+ <File
+ RelativePath=".\shard.h"
+ >
+ </File>
+ <File
+ RelativePath=".\strategy.h"
+ >
+ </File>
+ <Filter
+ Name="Header Shared"
+ >
+ <File
+ RelativePath="..\util\background.h"
+ >
+ </File>
+ <File
+ RelativePath="..\db\commands.h"
+ >
+ </File>
+ <File
+ RelativePath="..\client\connpool.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\db\dbmessage.h"
+ >
+ </File>
+ <File
+ RelativePath="..\util\goodies.h"
+ >
+ </File>
+ <File
+ RelativePath="..\db\jsobj.h"
+ >
+ </File>
+ <File
+ RelativePath="..\db\json.h"
+ >
+ </File>
+ <File
+ RelativePath="..\stdafx.h"
+ >
+ </File>
+ </Filter>
+ </Filter>
+ <Filter
+ Name="Resource Files"
+ Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav"
+ UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}"
+ >
+ </Filter>
+ <Filter
+ Name="libs_etc"
+ >
+ <File
+ RelativePath="..\..\boostw\boost_1_34_1\boost\config\auto_link.hpp"
+ >
+ </File>
+ <File
+ RelativePath="..\..\boostw\boost_1_34_1\boost\version.hpp"
+ >
+ </File>
+ </Filter>
+ <Filter
+ Name="client"
+ >
+ <File
+ RelativePath="..\client\connpool.h"
+ >
+ </File>
+ <File
+ RelativePath="..\client\dbclient.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\client\dbclient.h"
+ >
+ </File>
+ <File
+ RelativePath="..\client\model.cpp"
+ >
+ </File>
+ <File
+ RelativePath="..\client\model.h"
+ >
+ </File>
+ </Filter>
+ </Files>
+ <Globals>
+ </Globals>
+</VisualStudioProject>
diff --git a/s/dbgrid.vcxproj b/s/dbgrid.vcxproj new file mode 100644 index 0000000..e997055 --- /dev/null +++ b/s/dbgrid.vcxproj @@ -0,0 +1,201 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup Label="ProjectConfigurations"> + <ProjectConfiguration Include="Debug Recstore|Win32"> + <Configuration>Debug Recstore</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Debug|Win32"> + <Configuration>Debug</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|Win32"> + <Configuration>Release</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + </ItemGroup> + <PropertyGroup Label="Globals"> + <ProjectName>mongos</ProjectName> + <ProjectGuid>{E03717ED-69B4-4D21-BC55-DF6690B585C6}</ProjectGuid> + <RootNamespace>dbgrid</RootNamespace> + <Keyword>Win32Proj</Keyword> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <CharacterSet>Unicode</CharacterSet> + <WholeProgramOptimization>true</WholeProgramOptimization> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> + <ImportGroup Label="ExtensionSettings"> + </ImportGroup> + <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'" Label="PropertySheets"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" /> + </ImportGroup> + <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="PropertySheets"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" /> + </ImportGroup> + <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="PropertySheets"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" /> + </ImportGroup> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup> + <_ProjectFileVersion>10.0.21006.1</_ProjectFileVersion> + <OutDir Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">$(SolutionDir)$(Configuration)\</OutDir> + <IntDir Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">$(Configuration)\</IntDir> + <LinkIncremental Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</LinkIncremental> + <OutDir Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">$(SolutionDir)$(Configuration)\</OutDir> + <IntDir Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">$(Configuration)\</IntDir> + <LinkIncremental Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">false</LinkIncremental> + <OutDir Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">$(SolutionDir)$(Configuration)\</OutDir> + <IntDir Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">$(Configuration)\</IntDir> + <LinkIncremental Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">true</LinkIncremental> + </PropertyGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <ClCompile> + <Optimization>Disabled</Optimization> + <AdditionalIncludeDirectories>..\pcre-7.4;C:\Program Files\boost\boost_1_41_0;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories> + <PreprocessorDefinitions>USE_ASIO;WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H;PCRE_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <MinimalRebuild>true</MinimalRebuild> + <BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks> + <RuntimeLibrary>MultiThreadedDebugDLL</RuntimeLibrary> + <PrecompiledHeader>Use</PrecompiledHeader> + <PrecompiledHeaderFile>stdafx.h</PrecompiledHeaderFile> + <WarningLevel>Level3</WarningLevel> + <DebugInformationFormat>EditAndContinue</DebugInformationFormat> + <DisableSpecificWarnings>4355;4800;%(DisableSpecificWarnings)</DisableSpecificWarnings> + </ClCompile> + <Link> + <AdditionalDependencies>ws2_32.lib;%(AdditionalDependencies)</AdditionalDependencies> + <AdditionalLibraryDirectories>c:\program files\boost\boost_1_41_0\lib;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories> + <GenerateDebugInformation>true</GenerateDebugInformation> + <SubSystem>Console</SubSystem> + <TargetMachine>MachineX86</TargetMachine> + </Link> + </ItemDefinitionGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <ClCompile> + <Optimization>MaxSpeed</Optimization> + <IntrinsicFunctions>true</IntrinsicFunctions> + <PreprocessorDefinitions>USE_ASIO;WIN32;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;PCRE_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <RuntimeLibrary>MultiThreadedDLL</RuntimeLibrary> + <FunctionLevelLinking>true</FunctionLevelLinking> + <PrecompiledHeader>Use</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <DebugInformationFormat>ProgramDatabase</DebugInformationFormat> + <DisableSpecificWarnings>4355;4800;%(DisableSpecificWarnings)</DisableSpecificWarnings> + </ClCompile> + <Link> + <AdditionalDependencies>ws2_32.lib;%(AdditionalDependencies)</AdditionalDependencies> + <AdditionalLibraryDirectories>c:\Program Files\boost\boost_1_41_0\lib;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories> + <GenerateDebugInformation>true</GenerateDebugInformation> + <SubSystem>Console</SubSystem> + <OptimizeReferences>true</OptimizeReferences> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <TargetMachine>MachineX86</TargetMachine> + </Link> + </ItemDefinitionGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'"> + <ClCompile> + <Optimization>Disabled</Optimization> + <AdditionalIncludeDirectories>..\pcre-7.4;C:\Program Files\boost\boost_1_41_0;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories> + <PreprocessorDefinitions>USE_ASIO;WIN32;_DEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;HAVE_CONFIG_H;PCRE_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <MinimalRebuild>true</MinimalRebuild> + <BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks> + <RuntimeLibrary>MultiThreadedDebugDLL</RuntimeLibrary> + <PrecompiledHeader>Use</PrecompiledHeader> + <PrecompiledHeaderFile>stdafx.h</PrecompiledHeaderFile> + <WarningLevel>Level3</WarningLevel> + <DebugInformationFormat>EditAndContinue</DebugInformationFormat> + <DisableSpecificWarnings>4355;4800;%(DisableSpecificWarnings)</DisableSpecificWarnings> + </ClCompile> + <Link> + <AdditionalDependencies>ws2_32.lib;%(AdditionalDependencies)</AdditionalDependencies> + <AdditionalLibraryDirectories>c:\program files\boost\boost_1_41_0\lib;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories> + <GenerateDebugInformation>true</GenerateDebugInformation> + <SubSystem>Console</SubSystem> + <TargetMachine>MachineX86</TargetMachine> + </Link> + </ItemDefinitionGroup> + <ItemGroup> + <ClCompile Include="chunk.cpp" /> + <ClCompile Include="commands_admin.cpp" /> + <ClCompile Include="commands_public.cpp" /> + <ClCompile Include="config.cpp" /> + <ClCompile Include="cursors.cpp" /> + <ClCompile Include="..\db\queryutil.cpp" /> + <ClCompile Include="request.cpp" /> + <ClCompile Include="server.cpp" /> + <ClCompile Include="shardkey.cpp" /> + <ClCompile Include="..\stdafx.cpp"> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'">Create</PrecompiledHeader> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">Create</PrecompiledHeader> + </ClCompile> + <ClCompile Include="strategy.cpp" /> + <ClCompile Include="strategy_shard.cpp" /> + <ClCompile Include="strategy_single.cpp" /> + <ClCompile Include="..\util\assert_util.cpp" /> + <ClCompile Include="..\util\background.cpp" /> + <ClCompile Include="..\util\base64.cpp" /> + <ClCompile Include="..\db\commands.cpp" /> + <ClCompile Include="..\util\debug_util.cpp" /> + <ClCompile Include="..\db\jsobj.cpp" /> + <ClCompile Include="..\db\json.cpp" /> + <ClCompile Include="..\db\lasterror.cpp" /> + <ClCompile Include="..\util\md5.c"> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'"> + </PrecompiledHeader> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + </PrecompiledHeader> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + </PrecompiledHeader> + </ClCompile> + <ClCompile Include="..\util\md5main.cpp"> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Use</PrecompiledHeader> + </ClCompile> + <ClCompile Include="..\util\message.cpp" /> + <ClCompile Include="..\util\message_server_asio.cpp"> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug Recstore|Win32'"> + </PrecompiledHeader> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + </PrecompiledHeader> + <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + </PrecompiledHeader> + </ClCompile> + <ClCompile Include="..\db\nonce.cpp" /> + <ClCompile Include="..\client\parallel.cpp" /> + <ClCompile Include="..\util\sock.cpp" /> + <ClCompile Include="..\util\util.cpp" /> + <ClCompile Include="..\client\connpool.cpp" /> + <ClCompile Include="..\client\dbclient.cpp" /> + <ClCompile Include="..\client\model.cpp" /> + </ItemGroup> + <ItemGroup> + <ClInclude Include="gridconfig.h" /> + <ClInclude Include="griddatabase.h" /> + <ClInclude Include="shard.h" /> + <ClInclude Include="strategy.h" /> + <ClInclude Include="..\util\background.h" /> + <ClInclude Include="..\db\commands.h" /> + <ClInclude Include="..\db\dbmessage.h" /> + <ClInclude Include="..\util\goodies.h" /> + <ClInclude Include="..\db\jsobj.h" /> + <ClInclude Include="..\db\json.h" /> + <ClInclude Include="..\stdafx.h" /> + <ClInclude Include="..\client\connpool.h" /> + <ClInclude Include="..\client\dbclient.h" /> + <ClInclude Include="..\client\model.h" /> + </ItemGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> + <ImportGroup Label="ExtensionTargets"> + </ImportGroup> +</Project> diff --git a/s/request.cpp b/s/request.cpp new file mode 100644 index 0000000..8bebd64 --- /dev/null +++ b/s/request.cpp @@ -0,0 +1,175 @@ +/* dbgrid/request.cpp + + Top level handling of requests (operations such as query, insert, ...) +*/ + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "stdafx.h" +#include "server.h" +#include "../db/commands.h" +#include "../db/dbmessage.h" +#include "../client/connpool.h" + +#include "request.h" +#include "config.h" +#include "chunk.h" + +namespace mongo { + + Request::Request( Message& m, AbstractMessagingPort* p ) : + _m(m) , _d( m ) , _p(p){ + + assert( _d.getns() ); + _id = _m.data->id; + + _clientId = p ? p->remotePort() << 16 : 0; + _clientInfo = ClientInfo::get( _clientId ); + _clientInfo->newRequest(); + + reset(); + } + + void Request::reset( bool reload ){ + _config = grid.getDBConfig( getns() ); + if ( reload ) + uassert( 10192 , "db config reload failed!" , _config->reload() ); + + if ( _config->isSharded( getns() ) ){ + _chunkManager = _config->getChunkManager( getns() , reload ); + uassert( 10193 , (string)"no shard info for: " + getns() , _chunkManager ); + } + else { + _chunkManager = 0; + } + + _m.data->id = _id; + + } + + string Request::singleServerName(){ + if ( _chunkManager ){ + if ( _chunkManager->numChunks() > 1 ) + throw UserException( 8060 , "can't call singleServerName on a sharded collection" ); + return _chunkManager->findChunk( _chunkManager->getShardKey().globalMin() ).getShard(); + } + string s = _config->getShard( getns() ); + uassert( 10194 , "can't call singleServerName on a sharded collection!" , s.size() > 0 ); + return s; + } + + void Request::process( int attempt ){ + + log(2) << "Request::process ns: " << getns() << " msg id:" << (int)(_m.data->id) << " attempt: " << attempt << endl; + + int op = _m.data->operation(); + assert( op > dbMsg ); + + Strategy * s = SINGLE; + + _d.markSet(); + + if ( _chunkManager ){ + s = SHARDED; + } + + if ( op == dbQuery ) { + try { + s->queryOp( *this ); + } + catch ( StaleConfigException& staleConfig ){ + log() << staleConfig.what() << " attempt: " << attempt << endl; + uassert( 10195 , "too many attempts to update config, failing" , attempt < 5 ); + + sleepsecs( attempt ); + reset( true ); + _d.markReset(); + process( attempt + 1 ); + return; + } + } + else if ( op == dbGetMore ) { + s->getMore( *this ); + } + else { + s->writeOp( op, *this ); + } + } + + + ClientInfo::ClientInfo( int clientId ) : _id( clientId ){ + _cur = &_a; + _prev = &_b; + newRequest(); + } + + ClientInfo::~ClientInfo(){ + boostlock lk( _clientsLock ); + ClientCache::iterator i = _clients.find( _id ); + if ( i != _clients.end() ){ + _clients.erase( i ); + } + } + + void ClientInfo::addShard( const string& shard ){ + _cur->insert( shard ); + } + + void ClientInfo::newRequest(){ + _lastAccess = (int) time(0); + + set<string> * temp = _cur; + _cur = _prev; + _prev = temp; + _cur->clear(); + } + + void ClientInfo::disconnect(){ + _lastAccess = 0; + } + + ClientInfo * ClientInfo::get( int clientId , bool create ){ + + if ( ! clientId ) + clientId = getClientId(); + + if ( ! clientId ){ + ClientInfo * info = _tlInfo.get(); + if ( ! info ){ + info = new ClientInfo( 0 ); + _tlInfo.reset( info ); + } + info->newRequest(); + return info; + } + + boostlock lk( _clientsLock ); + ClientCache::iterator i = _clients.find( clientId ); + if ( i != _clients.end() ) + return i->second; + if ( ! create ) + return 0; + ClientInfo * info = new ClientInfo( clientId ); + _clients[clientId] = info; + return info; + } + + map<int,ClientInfo*> ClientInfo::_clients; + boost::mutex ClientInfo::_clientsLock; + boost::thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo; + +} // namespace mongo diff --git a/s/request.h b/s/request.h new file mode 100644 index 0000000..689216c --- /dev/null +++ b/s/request.h @@ -0,0 +1,120 @@ +// request.h + +#pragma once + +#include "../stdafx.h" +#include "../util/message.h" +#include "../db/dbmessage.h" +#include "config.h" +#include "util.h" + +namespace mongo { + + class ClientInfo; + + class Request : boost::noncopyable { + public: + Request( Message& m, AbstractMessagingPort* p ); + + // ---- message info ----- + + + const char * getns(){ + return _d.getns(); + } + int op(){ + return _m.data->operation(); + } + bool expectResponse(){ + return op() == dbQuery || op() == dbGetMore; + } + + MSGID id(){ + return _id; + } + + DBConfig * getConfig(){ + return _config; + } + bool isShardingEnabled(){ + return _config->isShardingEnabled(); + } + + ChunkManager * getChunkManager(){ + return _chunkManager; + } + + int getClientId(){ + return _clientId; + } + ClientInfo * getClientInfo(){ + return _clientInfo; + } + + // ---- remote location info ----- + + + string singleServerName(); + + const char * primaryName(){ + return _config->getPrimary().c_str(); + } + + // ---- low level access ---- + + void reply( Message & response ){ + _p->reply( _m , response , _id ); + } + + Message& m(){ return _m; } + DbMessage& d(){ return _d; } + AbstractMessagingPort* p(){ return _p; } + + void process( int attempt = 0 ); + + private: + + void reset( bool reload=false ); + + Message& _m; + DbMessage _d; + AbstractMessagingPort* _p; + + MSGID _id; + DBConfig * _config; + ChunkManager * _chunkManager; + + int _clientId; + ClientInfo * _clientInfo; + }; + + typedef map<int,ClientInfo*> ClientCache; + + class ClientInfo { + public: + ClientInfo( int clientId ); + ~ClientInfo(); + + void addShard( const string& shard ); + set<string> * getPrev() const { return _prev; }; + + void newRequest(); + void disconnect(); + + static ClientInfo * get( int clientId = 0 , bool create = true ); + + private: + int _id; + set<string> _a; + set<string> _b; + set<string> * _cur; + set<string> * _prev; + int _lastAccess; + + static boost::mutex _clientsLock; + static ClientCache _clients; + static boost::thread_specific_ptr<ClientInfo> _tlInfo; + }; +} + +#include "strategy.h" diff --git a/s/s_only.cpp b/s/s_only.cpp new file mode 100644 index 0000000..d692ff2 --- /dev/null +++ b/s/s_only.cpp @@ -0,0 +1,29 @@ +// s_only.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../stdafx.h" +#include "../client/dbclient.h" +#include "../db/dbhelpers.h" + +namespace mongo { + + auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){ + uassert( 10196 , "Helpers::find can't be used in mongos" , 0 ); + auto_ptr<CursorIterator> i; + return i; + } +} diff --git a/s/server.cpp b/s/server.cpp new file mode 100644 index 0000000..4868caf --- /dev/null +++ b/s/server.cpp @@ -0,0 +1,202 @@ +// server.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "stdafx.h" +#include "../util/message.h" +#include "../util/unittest.h" +#include "../client/connpool.h" +#include "../util/message_server.h" + +#include "server.h" +#include "request.h" +#include "config.h" +#include "chunk.h" + +namespace mongo { + + Database *database = 0; + string ourHostname; + OID serverID; + bool dbexitCalled = false; + CmdLine cmdLine; + + bool inShutdown(){ + return dbexitCalled; + } + + string getDbContext() { + return "?"; + } + + bool haveLocalShardingInfo( const string& ns ){ + assert( 0 ); + return false; + } + + void usage( char * argv[] ){ + out() << argv[0] << " usage:\n\n"; + out() << " -v+ verbose\n"; + out() << " --port <portno>\n"; + out() << " --configdb <configdbname> [<configdbname>...]\n"; + out() << endl; + } + + class ShardingConnectionHook : public DBConnectionHook { + public: + virtual void onCreate( DBClientBase * conn ){ + conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" ); + } + virtual void onHandedOut( DBClientBase * conn ){ + ClientInfo::get()->addShard( conn->getServerAddress() ); + } + } shardingConnectionHook; + + class ShardedMessageHandler : public MessageHandler { + public: + virtual ~ShardedMessageHandler(){} + virtual void process( Message& m , AbstractMessagingPort* p ){ + Request r( m , p ); + if ( logLevel > 5 ){ + log(5) << "client id: " << hex << r.getClientId() << "\t" << r.getns() << "\t" << dec << r.op() << endl; + } + try { + setClientId( r.getClientId() ); + r.process(); + } + catch ( DBException& e ){ + m.data->id = r.id(); + log() << "UserException: " << e.what() << endl; + if ( r.expectResponse() ){ + BSONObj err = BSON( "$err" << e.what() ); + replyToQuery( QueryResult::ResultFlag_ErrSet, p , m , err ); + } + } + } + }; + + void init(){ + serverID.init(); + setupSIGTRAPforGDB(); + } + + void start() { + log() << "waiting for connections on port " << cmdLine.port << endl; + //DbGridListener l(port); + //l.listen(); + ShardedMessageHandler handler; + MessageServer * server = createServer( cmdLine.port , &handler ); + server->run(); + } + + DBClientBase *createDirectClient(){ + uassert( 10197 , "createDirectClient not implemented for sharding yet" , 0 ); + return 0; + } + +} // namespace mongo + +using namespace mongo; + +int main(int argc, char* argv[], char *envp[] ) { + + bool justTests = false; + vector<string> configdbs; + + for (int i = 1; i < argc; i++) { + if ( argv[i] == 0 ) continue; + string s = argv[i]; + if ( s == "--port" ) { + cmdLine.port = atoi(argv[++i]); + } + else if ( s == "--configdb" ) { + + while ( ++i < argc ) + configdbs.push_back(argv[i]); + + if ( configdbs.size() == 0 ) { + out() << "error: no args for --configdb\n"; + return 4; + } + + if ( configdbs.size() > 2 ) { + out() << "error: --configdb does not support more than 2 parameters yet\n"; + return 5; + } + } + else if ( s.find( "-v" ) == 0 ){ + logLevel = s.size() - 1; + } + else if ( s == "--test" ) { + justTests = true; + logLevel = 5; + } + else { + usage( argv ); + return 3; + } + } + + if ( justTests ){ + UnitTest::runTests(); + cout << "tests passed" << endl; + return 0; + } + + pool.addHook( &shardingConnectionHook ); + + if ( argc <= 1 ) { + usage( argv ); + return 3; + } + + bool ok = cmdLine.port != 0 && configdbs.size(); + + if ( !ok ) { + usage( argv ); + return 1; + } + + log() << argv[0] << " v0.3- (alpha 3t) starting (--help for usage)" << endl; + printGitVersion(); + printSysInfo(); + + if ( ! configServer.init( configdbs ) ){ + cout << "couldn't connectd to config db" << endl; + return 7; + } + + assert( configServer.ok() ); + + int configError = configServer.checkConfigVersion(); + if ( configError ){ + cout << "config server error: " << configError << endl; + return configError; + } + + init(); + start(); + dbexit( EXIT_CLEAN ); + return 0; +} + +#undef exit +void mongo::dbexit( ExitCode rc, const char *why) { + dbexitCalled = true; + log() << "dbexit: " << why << " rc:" << rc << endl; + ::exit(rc); +} diff --git a/s/server.h b/s/server.h new file mode 100644 index 0000000..067afeb --- /dev/null +++ b/s/server.h @@ -0,0 +1,30 @@ +// server.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <string> +#include "../util/message.h" +#include "../db/jsobj.h" + +namespace mongo { + + extern std::string ourHostname; + extern OID serverID; + + // from request.cpp + void processRequest(Message& m, MessagingPort& p); +} diff --git a/s/shardkey.cpp b/s/shardkey.cpp new file mode 100644 index 0000000..15cf7b9 --- /dev/null +++ b/s/shardkey.cpp @@ -0,0 +1,320 @@ +// shardkey.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "stdafx.h" +#include "chunk.h" +#include "../db/jsobj.h" +#include "../util/unittest.h" + +/** + TODO: this only works with numbers right now + this is very temporary, need to make work with anything +*/ + +namespace mongo { + void minForPat(BSONObjBuilder& out, const BSONObj& pat){ + BSONElement e = pat.firstElement(); + if (e.type() == Object){ + BSONObjBuilder sub; + minForPat(sub, e.embeddedObject()); + out.append(e.fieldName(), sub.obj()); + } else { + out.appendMinKey(e.fieldName()); + } + } + + void maxForPat(BSONObjBuilder& out, const BSONObj& pat){ + BSONElement e = pat.firstElement(); + if (e.type() == Object){ + BSONObjBuilder sub; + maxForPat(sub, e.embeddedObject()); + out.append(e.fieldName(), sub.obj()); + } else { + out.appendMaxKey(e.fieldName()); + } + } + + ShardKeyPattern::ShardKeyPattern( BSONObj p ) : pattern( p.getOwned() ) { + pattern.getFieldNames(patternfields); + + BSONObjBuilder min; + minForPat(min, pattern); + gMin = min.obj(); + + BSONObjBuilder max; + maxForPat(max, pattern); + gMax = max.obj(); + } + + int ShardKeyPattern::compare( const BSONObj& lObject , const BSONObj& rObject ) { + BSONObj L = extractKey(lObject); + uassert( 10198 , "left object doesn't have shard key", !L.isEmpty()); + BSONObj R = extractKey(rObject); + uassert( 10199 , "right object doesn't have shard key", !R.isEmpty()); + return L.woCompare(R); + } + + bool ShardKeyPattern::hasShardKey( const BSONObj& obj ) { + /* this is written s.t. if obj has lots of fields, if the shard key fields are early, + it is fast. so a bit more work to try to be semi-fast. + */ + + for(set<string>::iterator it = patternfields.begin(); it != patternfields.end(); ++it){ + if(obj.getFieldDotted(it->c_str()).eoo()) + return false; + } + return true; + } + + /** @return true if shard s is relevant for query q. + + Example: + q: { x : 3 } + *this: { x : 1 } + s: x:2..x:7 + -> true + */ + + bool ShardKeyPattern::relevant(const BSONObj& query, const BSONObj& L, const BSONObj& R) { + BSONObj q = extractKey( query ); + if( q.isEmpty() ) + return true; + + BSONElement e = q.firstElement(); + assert( !e.eoo() ) ; + + if( e.type() == RegEx ) { + /* todo: if starts with ^, we could be smarter here */ + return true; + } + + if( e.type() == Object ) { + BSONObjIterator j(e.embeddedObject()); + BSONElement LE = L.firstElement(); // todo compound keys + BSONElement RE = R.firstElement(); // todo compound keys + while( 1 ) { + BSONElement f = j.next(); + if( f.eoo() ) + break; + int op = f.getGtLtOp(); + switch( op ) { + case BSONObj::LT: + if( f.woCompare(LE, false) <= 0 ) + return false; + break; + case BSONObj::LTE: + if( f.woCompare(LE, false) < 0 ) + return false; + break; + case BSONObj::GT: + case BSONObj::GTE: + if( f.woCompare(RE, false) >= 0 ) + return false; + break; + case BSONObj::opIN: + case BSONObj::NE: + case BSONObj::opSIZE: + massert( 10423 , "not implemented yet relevant()", false); + case BSONObj::Equality: + goto normal; + default: + massert( 10424 , "bad operator in relevant()?", false); + } + } + return true; + } +normal: + return L.woCompare(q) <= 0 && R.woCompare(q) > 0; + } + + bool ShardKeyPattern::relevantForQuery( const BSONObj& query , Chunk * chunk ){ + massert( 10425 , "not done for compound patterns", patternfields.size() == 1); + + bool rel = relevant(query, chunk->getMin(), chunk->getMax()); + if( ! hasShardKey( query ) ) + assert(rel); + + return rel; + } + + /** + returns a query that filters results only for the range desired, i.e. returns + { $gte : keyval(min), $lt : keyval(max) } + */ + void ShardKeyPattern::getFilter( BSONObjBuilder& b , const BSONObj& min, const BSONObj& max ){ + massert( 10426 , "not done for compound patterns", patternfields.size() == 1); + BSONObjBuilder temp; + temp.appendAs( extractKey(min).firstElement(), "$gte" ); + temp.appendAs( extractKey(max).firstElement(), "$lt" ); + + b.append( patternfields.begin()->c_str(), temp.obj() ); + } + + /** + Example + sort: { ts: -1 } + *this: { ts:1 } + -> -1 + + @return + 0 if sort either doesn't have all the fields or has extra fields + < 0 if sort is descending + > 1 if sort is ascending + */ + int ShardKeyPattern::canOrder( const BSONObj& sort ){ + // e.g.: + // sort { a : 1 , b : -1 } + // pattern { a : -1, b : 1, c : 1 } + // -> -1 + + int dir = 0; + + BSONObjIterator s(sort); + BSONObjIterator p(pattern); + while( 1 ) { + BSONElement e = s.next(); + if( e.eoo() ) + break; + if( !p.moreWithEOO() ) + return 0; + BSONElement ep = p.next(); + bool same = e == ep; + if( !same ) { + if( strcmp(e.fieldName(), ep.fieldName()) != 0 ) + return 0; + // same name, but opposite direction + if( dir == -1 ) + ; // ok + else if( dir == 1 ) + return 0; // wrong direction for a 2nd field + else // dir == 0, initial pass + dir = -1; + } + else { + // fields are the same + if( dir == -1 ) + return 0; // wrong direction + dir = 1; + } + } + + return dir; + } + + string ShardKeyPattern::toString() const { + return pattern.toString(); + } + + /* things to test for compound : + x hasshardkey + _ getFilter (hard?) + _ relevantForQuery + x canOrder + \ middle (deprecating?) + */ + class ShardKeyUnitTest : public UnitTest { + public: + void hasshardkeytest() { + BSONObj x = fromjson("{ zid : \"abcdefg\", num: 1.0, name: \"eliot\" }"); + ShardKeyPattern k( BSON( "num" << 1 ) ); + assert( k.hasShardKey(x) ); + assert( !k.hasShardKey( fromjson("{foo:'a'}") ) ); + + // try compound key + { + ShardKeyPattern k( fromjson("{a:1,b:-1,c:1}") ); + assert( k.hasShardKey( fromjson("{foo:'a',a:'b',c:'z',b:9,k:99}") ) ); + assert( !k.hasShardKey( fromjson("{foo:'a',a:'b',c:'z',bb:9,k:99}") ) ); + assert( !k.hasShardKey( fromjson("{k:99}") ) ); + } + + } + void rfq() { + ShardKeyPattern k( BSON( "key" << 1 ) ); + BSONObj q = BSON( "key" << 3 ); + Chunk c(0); + BSONObj z = fromjson("{ ns : \"alleyinsider.fs.chunks\" , min : {key:2} , max : {key:20} , server : \"localhost:30001\" }"); + c.unserialize(z); + assert( k.relevantForQuery(q, &c) ); + assert( k.relevantForQuery(fromjson("{foo:9,key:4}"), &c) ); + assert( !k.relevantForQuery(fromjson("{foo:9,key:43}"), &c) ); + assert( k.relevantForQuery(fromjson("{foo:9,key:{$gt:10}}"), &c) ); + assert( !k.relevantForQuery(fromjson("{foo:9,key:{$gt:22}}"), &c) ); + assert( k.relevantForQuery(fromjson("{foo:9}"), &c) ); + } + void getfilt() { + ShardKeyPattern k( BSON( "key" << 1 ) ); + BSONObjBuilder b; + k.getFilter(b, fromjson("{z:3,key:30}"), fromjson("{key:90}")); + BSONObj x = fromjson("{ key: { $gte: 30, $lt: 90 } }"); + assert( x.woEqual(b.obj()) ); + } + void testCanOrder() { + ShardKeyPattern k( fromjson("{a:1,b:-1,c:1}") ); + assert( k.canOrder( fromjson("{a:1}") ) == 1 ); + assert( k.canOrder( fromjson("{a:-1}") ) == -1 ); + assert( k.canOrder( fromjson("{a:1,b:-1,c:1}") ) == 1 ); + assert( k.canOrder( fromjson("{a:1,b:1}") ) == 0 ); + assert( k.canOrder( fromjson("{a:-1,b:1}") ) == -1 ); + } + void extractkeytest() { + ShardKeyPattern k( fromjson("{a:1,b:-1,c:1}") ); + + BSONObj x = fromjson("{a:1,b:2,c:3}"); + assert( k.extractKey( fromjson("{a:1,b:2,c:3}") ).woEqual(x) ); + assert( k.extractKey( fromjson("{b:2,c:3,a:1}") ).woEqual(x) ); + } + void run(){ + extractkeytest(); + + ShardKeyPattern k( BSON( "key" << 1 ) ); + + BSONObj min = k.globalMin(); + +// cout << min.jsonString(TenGen) << endl; + + BSONObj max = k.globalMax(); + + BSONObj k1 = BSON( "key" << 5 ); + + assert( k.compare( min , max ) < 0 ); + assert( k.compare( min , k1 ) < 0 ); + assert( k.compare( max , min ) > 0 ); + assert( k.compare( min , min ) == 0 ); + + hasshardkeytest(); + assert( k.hasShardKey( k1 ) ); + assert( ! k.hasShardKey( BSON( "key2" << 1 ) ) ); + + BSONObj a = k1; + BSONObj b = BSON( "key" << 999 ); + + assert( k.compare(a,b) < 0 ); + + assert( k.canOrder( fromjson("{key:1}") ) == 1 ); + assert( k.canOrder( fromjson("{zz:1}") ) == 0 ); + assert( k.canOrder( fromjson("{key:-1}") ) == -1 ); + + testCanOrder(); + getfilt(); + rfq(); + // add middle multitype tests + } + } shardKeyTest; + +} // namespace mongo diff --git a/s/shardkey.h b/s/shardkey.h new file mode 100644 index 0000000..0c357f6 --- /dev/null +++ b/s/shardkey.h @@ -0,0 +1,128 @@ +// shardkey.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../client/dbclient.h" + +namespace mongo { + + class Chunk; + + /* A ShardKeyPattern is a pattern indicating what data to extract from the object to make the shard key from. + Analogous to an index key pattern. + */ + class ShardKeyPattern { + public: + ShardKeyPattern( BSONObj p = BSONObj() ); + + /** + global min is the lowest possible value for this key + e.g. { num : MinKey } + */ + BSONObj globalMin() const { return gMin; } + + /** + global max is the highest possible value for this key + */ + BSONObj globalMax() const { return gMax; } + + bool isGlobalMin( const BSONObj& k ){ + return k.woCompare( globalMin() ) == 0; + } + + bool isGlobalMax( const BSONObj& k ){ + return k.woCompare( globalMax() ) == 0; + } + + bool isGlobal( const BSONObj& k ){ + return isGlobalMin( k ) || isGlobalMax( k ); + } + + /** compare shard keys from the objects specified + l < r negative + l == r 0 + l > r positive + */ + int compare( const BSONObj& l , const BSONObj& r ); + + /** + @return whether or not obj has all fields in this shard key pattern + e.g. + ShardKey({num:1}).hasShardKey({ name:"joe", num:3 }) is true + */ + bool hasShardKey( const BSONObj& obj ); + + /** + returns a query that filters results only for the range desired, i.e. returns + { "field" : { $gte: keyval(min), $lt: keyval(max) } } + */ + void getFilter( BSONObjBuilder& b , const BSONObj& min, const BSONObj& max ); + + /** @return true if shard s is relevant for query q. + + Example: + q: { x : 3 } + *this: { x : 1 } + s: x:2..x:7 + -> true + */ + bool relevantForQuery( const BSONObj& q , Chunk * s ); + + /** + Returns if the given sort pattern can be ordered by the shard key pattern. + Example + sort: { ts: -1 } + *this: { ts:1 } + -> -1 + + @return + 0 if sort either doesn't have all the fields or has extra fields + < 0 if sort is descending + > 1 if sort is ascending + */ + int canOrder( const BSONObj& sort ); + + BSONObj key() { return pattern; } + + string toString() const; + + BSONObj extractKey(const BSONObj& from) const; + + bool partOfShardKey(const string& key ) const { + return patternfields.count( key ) > 0; + } + + operator string() const { + return pattern.toString(); + } + private: + BSONObj pattern; + BSONObj gMin; + BSONObj gMax; + + /* question: better to have patternfields precomputed or not? depends on if we use copy contructor often. */ + set<string> patternfields; + bool relevant(const BSONObj& query, const BSONObj& L, const BSONObj& R); + }; + + inline BSONObj ShardKeyPattern::extractKey(const BSONObj& from) const { + return from.extractFields(pattern); + } + +} diff --git a/s/strategy.cpp b/s/strategy.cpp new file mode 100644 index 0000000..b485bd2 --- /dev/null +++ b/s/strategy.cpp @@ -0,0 +1,221 @@ +// stragegy.cpp + +#include "stdafx.h" +#include "request.h" +#include "../util/background.h" +#include "../client/connpool.h" +#include "../db/commands.h" +#include "server.h" + +namespace mongo { + + // ----- Strategy ------ + + void Strategy::doWrite( int op , Request& r , string server ){ + ScopedDbConnection dbcon( server ); + DBClientBase &_c = dbcon.conn(); + + /* TODO FIX - do not case and call DBClientBase::say() */ + DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c); + c.port().say( r.m() ); + + dbcon.done(); + } + + void Strategy::doQuery( Request& r , string server ){ + try{ + ScopedDbConnection dbcon( server ); + DBClientBase &_c = dbcon.conn(); + + checkShardVersion( _c , r.getns() ); + + // TODO: This will not work with Paired connections. Fix. + DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c); + Message response; + bool ok = c.port().call( r.m(), response); + + { + QueryResult *qr = (QueryResult *) response.data; + if ( qr->resultFlags() & QueryResult::ResultFlag_ShardConfigStale ){ + dbcon.done(); + throw StaleConfigException( r.getns() , "Strategy::doQuery" ); + } + } + + uassert( 10200 , "mongos: error calling db", ok); + r.reply( response ); + dbcon.done(); + } + catch ( AssertionException& e ) { + BSONObjBuilder err; + err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg)); + BSONObj errObj = err.done(); + replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj); + } + } + + void Strategy::insert( string server , const char * ns , const BSONObj& obj ){ + ScopedDbConnection dbcon( server ); + checkShardVersion( dbcon.conn() , ns ); + dbcon->insert( ns , obj ); + dbcon.done(); + } + + map<DBClientBase*,unsigned long long> checkShardVersionLastSequence; + + class WriteBackListener : public BackgroundJob { + protected: + + WriteBackListener( const string& addr ) : _addr( addr ){ + cout << "creating WriteBackListener for: " << addr << endl; + } + + void run(){ + int secsToSleep = 0; + while ( 1 ){ + try { + ScopedDbConnection conn( _addr ); + + BSONObj result; + + { + BSONObjBuilder cmd; + cmd.appendOID( "writebacklisten" , &serverID ); + if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){ + log() << "writebacklisten command failed! " << result << endl; + conn.done(); + continue; + } + + } + + log(1) << "writebacklisten result: " << result << endl; + + BSONObj data = result.getObjectField( "data" ); + if ( data.getBoolField( "writeBack" ) ){ + string ns = data["ns"].valuestrsafe(); + + int len; + + Message m( (void*)data["msg"].binData( len ) , false ); + massert( 10427 , "invalid writeback message" , m.data->valid() ); + + grid.getDBConfig( ns )->getChunkManager( ns , true ); + + Request r( m , 0 ); + r.process(); + } + else { + log() << "unknown writeBack result: " << result << endl; + } + + conn.done(); + secsToSleep = 0; + } + catch ( std::exception e ){ + log() << "WriteBackListener exception : " << e.what() << endl; + } + catch ( ... ){ + log() << "WriteBackListener uncaught exception!" << endl; + } + secsToSleep++; + sleepsecs(secsToSleep); + if ( secsToSleep > 10 ) + secsToSleep = 0; + } + } + + private: + string _addr; + static map<string,WriteBackListener*> _cache; + + public: + static void init( DBClientBase& conn ){ + WriteBackListener*& l = _cache[conn.getServerAddress()]; + if ( l ) + return; + l = new WriteBackListener( conn.getServerAddress() ); + l->go(); + } + + }; + + map<string,WriteBackListener*> WriteBackListener::_cache; + + + void checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative ){ + // TODO: cache, optimize, etc... + + WriteBackListener::init( conn ); + + DBConfig * conf = grid.getDBConfig( ns ); + if ( ! conf ) + return; + + ShardChunkVersion version = 0; + unsigned long long officialSequenceNumber = 0; + + if ( conf->isSharded( ns ) ){ + ChunkManager * manager = conf->getChunkManager( ns , authoritative ); + officialSequenceNumber = manager->getSequenceNumber(); + version = manager->getVersion( conn.getServerAddress() ); + } + + unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ]; + if ( officialSequenceNumber == sequenceNumber ) + return; + + log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber << endl; + + BSONObj result; + if ( setShardVersion( conn , ns , version , authoritative , result ) ){ + // success! + log(1) << " setShardVersion success!" << endl; + sequenceNumber = officialSequenceNumber; + return; + } + + log(1) << " setShardVersion failed!\n" << result << endl; + + if ( result.getBoolField( "need_authoritative" ) ) + massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative ); + + if ( ! authoritative ){ + checkShardVersion( conn , ns , 1 ); + return; + } + + log(1) << " setShardVersion failed: " << result << endl; + massert( 10429 , "setShardVersion failed!" , 0 ); + } + + bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){ + + BSONObjBuilder cmdBuilder; + cmdBuilder.append( "setShardVersion" , ns.c_str() ); + cmdBuilder.append( "configdb" , configServer.modelServer() ); + cmdBuilder.appendTimestamp( "version" , version ); + cmdBuilder.appendOID( "serverID" , &serverID ); + if ( authoritative ) + cmdBuilder.appendBool( "authoritative" , 1 ); + BSONObj cmd = cmdBuilder.obj(); + + log(1) << " setShardVersion " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl; + + return conn.runCommand( "admin" , cmd , result ); + } + + bool lockNamespaceOnServer( const string& server , const string& ns ){ + ScopedDbConnection conn( server ); + bool res = lockNamespaceOnServer( conn.conn() , ns ); + conn.done(); + return res; + } + + bool lockNamespaceOnServer( DBClientBase& conn , const string& ns ){ + BSONObj lockResult; + return setShardVersion( conn , ns , grid.getNextOpTime() , true , lockResult ); + } + + +} diff --git a/s/strategy.h b/s/strategy.h new file mode 100644 index 0000000..e4b93b5 --- /dev/null +++ b/s/strategy.h @@ -0,0 +1,36 @@ +// strategy.h + +#pragma once + +#include "../stdafx.h" +#include "chunk.h" +#include "request.h" + +namespace mongo { + + class Strategy { + public: + Strategy(){} + virtual ~Strategy() {} + virtual void queryOp( Request& r ) = 0; + virtual void getMore( Request& r ) = 0; + virtual void writeOp( int op , Request& r ) = 0; + + protected: + void doWrite( int op , Request& r , string server ); + void doQuery( Request& r , string server ); + + void insert( string server , const char * ns , const BSONObj& obj ); + + }; + + extern Strategy * SINGLE; + extern Strategy * SHARDED; + + bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ); + + bool lockNamespaceOnServer( const string& server , const string& ns ); + bool lockNamespaceOnServer( DBClientBase& conn , const string& ns ); + +} + diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp new file mode 100644 index 0000000..34cf226 --- /dev/null +++ b/s/strategy_shard.cpp @@ -0,0 +1,260 @@ +// strategy_sharded.cpp + +#include "stdafx.h" +#include "request.h" +#include "chunk.h" +#include "cursors.h" +#include "../client/connpool.h" +#include "../db/commands.h" + +// error codes 8010-8040 + +namespace mongo { + + class ShardStrategy : public Strategy { + + virtual void queryOp( Request& r ){ + QueryMessage q( r.d() ); + + log(3) << "shard query: " << q.ns << " " << q.query << endl; + + if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") ) + throw UserException( 8010 , "something is wrong, shouldn't see a command here" ); + + ChunkManager * info = r.getChunkManager(); + assert( info ); + + Query query( q.query ); + + vector<Chunk*> shards; + info->getChunksForQuery( shards , query.getFilter() ); + + set<ServerAndQuery> servers; + map<string,int> serverCounts; + for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){ + servers.insert( ServerAndQuery( (*i)->getShard() , (*i)->getFilter() ) ); + int& num = serverCounts[(*i)->getShard()]; + num++; + } + + if ( logLevel > 4 ){ + StringBuilder ss; + ss << " shard query servers: " << servers.size() << "\n"; + for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){ + const ServerAndQuery& s = *i; + ss << " " << s.toString() << "\n"; + } + log() << ss.str(); + } + + ClusteredCursor * cursor = 0; + + BSONObj sort = query.getSort(); + + if ( sort.isEmpty() ){ + // 1. no sort, can just hit them in serial + cursor = new SerialServerClusteredCursor( servers , q ); + } + else { + int shardKeyOrder = info->getShardKey().canOrder( sort ); + if ( shardKeyOrder ){ + // 2. sort on shard key, can do in serial intelligently + set<ServerAndQuery> buckets; + for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){ + Chunk * s = *i; + buckets.insert( ServerAndQuery( s->getShard() , s->getFilter() , s->getMin() ) ); + } + cursor = new SerialServerClusteredCursor( buckets , q , shardKeyOrder ); + } + else { + // 3. sort on non-sharded key, pull back a portion from each server and iterate slowly + cursor = new ParallelSortClusteredCursor( servers , q , sort ); + } + } + + assert( cursor ); + + log(5) << " cursor type: " << cursor->type() << endl; + + ShardedClientCursor * cc = new ShardedClientCursor( q , cursor ); + if ( ! cc->sendNextBatch( r ) ){ + delete( cursor ); + return; + } + log(6) << "storing cursor : " << cc->getId() << endl; + cursorCache.store( cc ); + } + + virtual void getMore( Request& r ){ + int ntoreturn = r.d().pullInt(); + long long id = r.d().pullInt64(); + + log(6) << "want cursor : " << id << endl; + + ShardedClientCursor * cursor = cursorCache.get( id ); + if ( ! cursor ){ + log(6) << "\t invalid cursor :(" << endl; + replyToQuery( QueryResult::ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 ); + return; + } + + if ( cursor->sendNextBatch( r , ntoreturn ) ){ + log(6) << "\t cursor finished: " << id << endl; + return; + } + + delete( cursor ); + cursorCache.remove( id ); + } + + void _insert( Request& r , DbMessage& d, ChunkManager* manager ){ + + while ( d.moreJSObjs() ){ + BSONObj o = d.nextJsObj(); + if ( ! manager->hasShardKey( o ) ){ + + bool bad = true; + + if ( manager->getShardKey().partOfShardKey( "_id" ) ){ + BSONObjBuilder b; + b.appendOID( "_id" , 0 , true ); + b.appendElements( o ); + o = b.obj(); + bad = ! manager->hasShardKey( o ); + } + + if ( bad ){ + log() << "tried to insert object without shard key: " << r.getns() << " " << o << endl; + throw UserException( 8011 , "tried to insert object without shard key" ); + } + + } + + Chunk& c = manager->findChunk( o ); + log(4) << " server:" << c.getShard() << " " << o << endl; + insert( c.getShard() , r.getns() , o ); + + c.splitIfShould( o.objsize() ); + } + } + + void _update( Request& r , DbMessage& d, ChunkManager* manager ){ + int flags = d.pullInt(); + + BSONObj query = d.nextJsObj(); + uassert( 10201 , "invalid update" , d.moreJSObjs() ); + BSONObj toupdate = d.nextJsObj(); + + BSONObj chunkFinder = query; + + bool upsert = flags & UpdateOption_Upsert; + bool multi = flags & UpdateOption_Multi; + + if ( multi ) + uassert( 10202 , "can't mix multi and upsert and sharding" , ! upsert ); + + if ( upsert && !(manager->hasShardKey(toupdate) || + (toupdate.firstElement().fieldName()[0] == '$' && manager->hasShardKey(query)))) + { + throw UserException( 8012 , "can't upsert something without shard key" ); + } + + bool save = false; + if ( ! manager->hasShardKey( query ) ){ + if ( multi ){ + } + else if ( query.nFields() != 1 || strcmp( query.firstElement().fieldName() , "_id" ) ){ + throw UserException( 8013 , "can't do update with query that doesn't have the shard key" ); + } + else { + save = true; + chunkFinder = toupdate; + } + } + + + if ( ! save ){ + if ( toupdate.firstElement().fieldName()[0] == '$' ){ + // TODO: check for $set, etc.. on shard key + } + else if ( manager->hasShardKey( toupdate ) && manager->getShardKey().compare( query , toupdate ) ){ + throw UserException( 8014 , "change would move shards!" ); + } + } + + if ( multi ){ + vector<Chunk*> chunks; + manager->getChunksForQuery( chunks , chunkFinder ); + set<string> seen; + for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){ + Chunk * c = *i; + if ( seen.count( c->getShard() ) ) + continue; + doWrite( dbUpdate , r , c->getShard() ); + seen.insert( c->getShard() ); + } + } + else { + Chunk& c = manager->findChunk( chunkFinder ); + doWrite( dbUpdate , r , c.getShard() ); + c.splitIfShould( d.msg().data->dataLen() ); + } + + } + + void _delete( Request& r , DbMessage& d, ChunkManager* manager ){ + + int flags = d.pullInt(); + bool justOne = flags & 1; + + uassert( 10203 , "bad delete message" , d.moreJSObjs() ); + BSONObj pattern = d.nextJsObj(); + + vector<Chunk*> chunks; + manager->getChunksForQuery( chunks , pattern ); + cout << "delete : " << pattern << " \t " << chunks.size() << " justOne: " << justOne << endl; + if ( chunks.size() == 1 ){ + doWrite( dbDelete , r , chunks[0]->getShard() ); + return; + } + + if ( justOne && ! pattern.hasField( "_id" ) ) + throw UserException( 8015 , "can only delete with a non-shard key pattern if can delete as many as we find" ); + + set<string> seen; + for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){ + Chunk * c = *i; + if ( seen.count( c->getShard() ) ) + continue; + seen.insert( c->getShard() ); + doWrite( dbDelete , r , c->getShard() ); + } + } + + virtual void writeOp( int op , Request& r ){ + const char *ns = r.getns(); + log(3) << "write: " << ns << endl; + + DbMessage& d = r.d(); + ChunkManager * info = r.getChunkManager(); + assert( info ); + + if ( op == dbInsert ){ + _insert( r , d , info ); + } + else if ( op == dbUpdate ){ + _update( r , d , info ); + } + else if ( op == dbDelete ){ + _delete( r , d , info ); + } + else { + log() << "sharding can't do write op: " << op << endl; + throw UserException( 8016 , "can't do this write op on sharded collection" ); + } + + } + }; + + Strategy * SHARDED = new ShardStrategy(); +} diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp new file mode 100644 index 0000000..9cf8a63 --- /dev/null +++ b/s/strategy_single.cpp @@ -0,0 +1,131 @@ +// strategy_simple.cpp + +#include "stdafx.h" +#include "request.h" +#include "../client/connpool.h" +#include "../db/commands.h" + +namespace mongo { + + class SingleStrategy : public Strategy { + + public: + SingleStrategy(){ + _commandsSafeToPass.insert( "$eval" ); + _commandsSafeToPass.insert( "create" ); + } + + private: + virtual void queryOp( Request& r ){ + QueryMessage q( r.d() ); + + bool lateAssert = false; + + log(3) << "single query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn << endl; + + try { + if ( ( q.ntoreturn == -1 || q.ntoreturn == 1 ) && strstr(q.ns, ".$cmd") ) { + BSONObjBuilder builder; + bool ok = Command::runAgainstRegistered(q.ns, q.query, builder); + if ( ok ) { + BSONObj x = builder.done(); + replyToQuery(0, r.p(), r.m(), x); + return; + } + + string commandName = q.query.firstElement().fieldName(); + if ( ! _commandsSafeToPass.count( commandName ) ) + log() << "passing through unknown command: " << commandName << " " << q.query << endl; + } + + lateAssert = true; + doQuery( r , r.singleServerName() ); + } + catch ( AssertionException& e ) { + assert( !lateAssert ); + BSONObjBuilder err; + err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg)); + BSONObj errObj = err.done(); + replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj); + return; + } + + } + + virtual void getMore( Request& r ){ + const char *ns = r.getns(); + + log(3) << "single getmore: " << ns << endl; + + ScopedDbConnection dbcon( r.singleServerName() ); + DBClientBase& _c = dbcon.conn(); + + // TODO + DBClientConnection &c = dynamic_cast<DBClientConnection&>(_c); + + Message response; + bool ok = c.port().call( r.m() , response); + uassert( 10204 , "dbgrid: getmore: error calling db", ok); + r.reply( response ); + + dbcon.done(); + + } + + void handleIndexWrite( int op , Request& r ){ + + DbMessage& d = r.d(); + + if ( op == dbInsert ){ + while( d.moreJSObjs() ){ + BSONObj o = d.nextJsObj(); + const char * ns = o["ns"].valuestr(); + if ( r.getConfig()->isSharded( ns ) ){ + uassert( 10205 , (string)"can't use unique indexes with sharding ns:" + ns + + " key: " + o["key"].embeddedObjectUserCheck().toString() , + IndexDetails::isIdIndexPattern( o["key"].embeddedObjectUserCheck() ) || + ! o["unique"].trueValue() ); + ChunkManager * cm = r.getConfig()->getChunkManager( ns ); + assert( cm ); + for ( int i=0; i<cm->numChunks();i++) + doWrite( op , r , cm->getChunk(i)->getShard() ); + } + else { + doWrite( op , r , r.singleServerName() ); + } + } + } + else if ( op == dbUpdate ){ + throw UserException( 8050 , "can't update system.indexes" ); + } + else if ( op == dbDelete ){ + // TODO + throw UserException( 8051 , "can't delete indexes on sharded collection yet" ); + } + else { + log() << "handleIndexWrite invalid write op: " << op << endl; + throw UserException( 8052 , "handleIndexWrite invalid write op" ); + } + + } + + virtual void writeOp( int op , Request& r ){ + const char *ns = r.getns(); + + if ( r.isShardingEnabled() && + strstr( ns , ".system.indexes" ) == strstr( ns , "." ) && + strstr( ns , "." ) ){ + log(1) << " .system.indexes write for: " << ns << endl; + handleIndexWrite( op , r ); + return; + } + + log(3) << "single write: " << ns << endl; + doWrite( op , r , r.singleServerName() ); + } + + set<string> _commandsSafeToPass; + }; + + Strategy * SINGLE = new SingleStrategy(); +} diff --git a/s/util.h b/s/util.h new file mode 100644 index 0000000..ba40a29 --- /dev/null +++ b/s/util.h @@ -0,0 +1,51 @@ +// util.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../stdafx.h" +#include "../client/dbclient.h" + +/** + some generic sharding utils that can be used in mongod or mongos + */ + +namespace mongo { + + /** + your config info for a given shard/chunk is out of date */ + class StaleConfigException : public std::exception { + public: + StaleConfigException( const string& ns , const string& msg){ + stringstream s; + s << "StaleConfigException ns: " << ns << " " << msg; + _msg = s.str(); + } + + virtual ~StaleConfigException() throw(){} + + virtual const char* what() const throw(){ + return _msg.c_str(); + } + private: + string _msg; + }; + + void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false ); + +} |