summaryrefslogtreecommitdiff
path: root/s/d_chunk_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/d_chunk_manager.cpp')
-rw-r--r--s/d_chunk_manager.cpp328
1 files changed, 328 insertions, 0 deletions
diff --git a/s/d_chunk_manager.cpp b/s/d_chunk_manager.cpp
new file mode 100644
index 0000000..d4fea30
--- /dev/null
+++ b/s/d_chunk_manager.cpp
@@ -0,0 +1,328 @@
+// @file d_chunk_manager.cpp
+
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+
+#include "../client/connpool.h"
+#include "../client/dbclientmockcursor.h"
+#include "../db/instance.h"
+
+#include "d_chunk_manager.h"
+
+namespace mongo {
+
+ ShardChunkManager::ShardChunkManager( const string& configServer , const string& ns , const string& shardName ) {
+
+ // have to get a connection to the config db
+ // special case if i'm the configdb since i'm locked and if i connect to myself
+ // its a deadlock
+ scoped_ptr<ScopedDbConnection> scoped;
+ scoped_ptr<DBDirectClient> direct;
+ DBClientBase * conn;
+ if ( configServer.empty() ) {
+ direct.reset( new DBDirectClient() );
+ conn = direct.get();
+ }
+ else {
+ scoped.reset( new ScopedDbConnection( configServer ) );
+ conn = scoped->get();
+ }
+
+ // get this collection's sharding key
+ BSONObj collectionDoc = conn->findOne( "config.collections", BSON( "_id" << ns ) );
+ uassert( 13539 , str::stream() << ns << " does not exist" , !collectionDoc.isEmpty() );
+ uassert( 13540 , str::stream() << ns << " collection config entry corrupted" , collectionDoc["dropped"].type() );
+ uassert( 13541 , str::stream() << ns << " dropped. Re-shard collection first." , !collectionDoc["dropped"].Bool() );
+ _fillCollectionKey( collectionDoc );
+
+ // query for all the chunks for 'ns' that live in this shard, sorting so we can efficiently bucket them
+ BSONObj q = BSON( "ns" << ns << "shard" << shardName );
+ auto_ptr<DBClientCursor> cursor = conn->query( "config.chunks" , Query(q).sort( "min" ) );
+ _fillChunks( cursor.get() );
+ _fillRanges();
+
+ if ( scoped.get() )
+ scoped->done();
+
+ if ( _chunksMap.empty() )
+ log() << "no chunk for collection " << ns << " on shard " << shardName << endl;
+ }
+
+ ShardChunkManager::ShardChunkManager( const BSONObj& collectionDoc , const BSONArray& chunksArr ) {
+ _fillCollectionKey( collectionDoc );
+
+ scoped_ptr<DBClientMockCursor> c ( new DBClientMockCursor( chunksArr ) );
+ _fillChunks( c.get() );
+ _fillRanges();
+ }
+
+ void ShardChunkManager::_fillCollectionKey( const BSONObj& collectionDoc ) {
+ BSONElement e = collectionDoc["key"];
+ uassert( 13542 , str::stream() << "collection doesn't have a key: " << collectionDoc , ! e.eoo() && e.isABSONObj() );
+
+ BSONObj keys = e.Obj().getOwned();
+ BSONObjBuilder b;
+ BSONForEach( key , keys ) {
+ b.append( key.fieldName() , 1 );
+ }
+ _key = b.obj();
+ }
+
+ void ShardChunkManager::_fillChunks( DBClientCursorInterface* cursor ) {
+ assert( cursor );
+
+ ShardChunkVersion version;
+ while ( cursor->more() ) {
+ BSONObj d = cursor->next();
+ _chunksMap.insert( make_pair( d["min"].Obj().getOwned() , d["max"].Obj().getOwned() ) );
+
+ ShardChunkVersion currVersion( d["lastmod"] );
+ if ( currVersion > version ) {
+ version = currVersion;
+ }
+ }
+ _version = version;
+ }
+
+ void ShardChunkManager::_fillRanges() {
+ if ( _chunksMap.empty() )
+ return;
+
+ // load the chunk information, coallesceing their ranges
+ // the version for this shard would be the highest version for any of the chunks
+ RangeMap::const_iterator it = _chunksMap.begin();
+ BSONObj min,max;
+ while ( it != _chunksMap.end() ) {
+ BSONObj currMin = it->first;
+ BSONObj currMax = it->second;
+ ++it;
+
+ // coallesce the chunk's bounds in ranges if they are adjacent chunks
+ if ( min.isEmpty() ) {
+ min = currMin;
+ max = currMax;
+ continue;
+ }
+ if ( max == currMin ) {
+ max = currMax;
+ continue;
+ }
+
+ _rangesMap.insert( make_pair( min , max ) );
+
+ min = currMin;
+ max = currMax;
+ }
+ assert( ! min.isEmpty() );
+
+ _rangesMap.insert( make_pair( min , max ) );
+ }
+
+ static bool contains( const BSONObj& min , const BSONObj& max , const BSONObj& point ) {
+ return point.woCompare( min ) >= 0 && point.woCompare( max ) < 0;
+ }
+
+ bool ShardChunkManager::belongsToMe( const BSONObj& obj ) const {
+ if ( _rangesMap.size() == 0 )
+ return false;
+
+ BSONObj x = obj.extractFields(_key);
+
+ RangeMap::const_iterator it = _rangesMap.upper_bound( x );
+ if ( it != _rangesMap.begin() )
+ it--;
+
+ bool good = contains( it->first , it->second , x );
+
+#if 0
+ if ( ! good ) {
+ log() << "bad: " << x << " " << it->first << " " << x.woCompare( it->first ) << " " << x.woCompare( it->second ) << endl;
+ for ( RangeMap::const_iterator i=_rangesMap.begin(); i!=_rangesMap.end(); ++i ) {
+ log() << "\t" << i->first << "\t" << i->second << "\t" << endl;
+ }
+ }
+#endif
+
+ return good;
+ }
+
+ bool ShardChunkManager::getNextChunk( const BSONObj& lookupKey, BSONObj* foundMin , BSONObj* foundMax ) const {
+ assert( foundMin );
+ assert( foundMax );
+ *foundMin = BSONObj();
+ *foundMax = BSONObj();
+
+ if ( _chunksMap.empty() ) {
+ return true;
+ }
+
+ RangeMap::const_iterator it;
+ if ( lookupKey.isEmpty() ) {
+ it = _chunksMap.begin();
+ *foundMin = it->first;
+ *foundMax = it->second;
+ return _chunksMap.size() == 1;
+ }
+
+ it = _chunksMap.upper_bound( lookupKey );
+ if ( it != _chunksMap.end() ) {
+ *foundMin = it->first;
+ *foundMax = it->second;
+ return false;
+ }
+
+ return true;
+ }
+
+ void ShardChunkManager::_assertChunkExists( const BSONObj& min , const BSONObj& max ) const {
+ RangeMap::const_iterator it = _chunksMap.find( min );
+ if ( it == _chunksMap.end() ) {
+ uasserted( 13586 , str::stream() << "couldn't find chunk " << min << "->" << max );
+ }
+
+ if ( it->second.woCompare( max ) != 0 ) {
+ ostringstream os;
+ os << "ranges differ, "
+ << "requested: " << min << " -> " << max << " "
+ << "existing: " << (it == _chunksMap.end()) ? "<empty>" : it->first.toString() + " -> " + it->second.toString();
+ uasserted( 13587 , os.str() );
+ }
+ }
+
+ ShardChunkManager* ShardChunkManager::cloneMinus( const BSONObj& min, const BSONObj& max, const ShardChunkVersion& version ) {
+
+ // check that we have the exact chunk that'll be subtracted
+ _assertChunkExists( min , max );
+
+ auto_ptr<ShardChunkManager> p( new ShardChunkManager );
+ p->_key = this->_key;
+
+ if ( _chunksMap.size() == 1 ) {
+ // if left with no chunks, just reset version
+ uassert( 13590 , str::stream() << "setting version to " << version << " on removing last chunk", version == 0 );
+
+ p->_version = 0;
+
+ }
+ else {
+ // can't move version backwards when subtracting chunks
+ // this is what guarantees that no read or write would be taken once we subtract data from the current shard
+ if ( version <= _version ) {
+ uasserted( 13585 , str::stream() << "version " << version.toString() << " not greater than " << _version.toString() );
+ }
+
+ p->_chunksMap = this->_chunksMap;
+ p->_chunksMap.erase( min );
+ p->_version = version;
+ p->_fillRanges();
+ }
+
+ return p.release();
+ }
+
+ static bool overlap( const BSONObj& l1 , const BSONObj& h1 , const BSONObj& l2 , const BSONObj& h2 ) {
+ return ! ( ( h1.woCompare( l2 ) <= 0 ) || ( h2.woCompare( l1 ) <= 0 ) );
+ }
+
+ ShardChunkManager* ShardChunkManager::clonePlus( const BSONObj& min , const BSONObj& max , const ShardChunkVersion& version ) {
+
+ // it is acceptable to move version backwards (e.g., undoing a migration that went bad during commit)
+ // but only cloning away the last chunk may reset the version to 0
+ uassert( 13591 , "version can't be set to zero" , version > 0 );
+
+ if ( ! _chunksMap.empty() ) {
+
+ // check that there isn't any chunk on the interval to be added
+ RangeMap::const_iterator it = _chunksMap.lower_bound( max );
+ if ( it != _chunksMap.begin() ) {
+ --it;
+ }
+ if ( overlap( min , max , it->first , it->second ) ) {
+ ostringstream os;
+ os << "ranges overlap, "
+ << "requested: " << min << " -> " << max << " "
+ << "existing: " << it->first.toString() + " -> " + it->second.toString();
+ uasserted( 13588 , os.str() );
+ }
+ }
+
+ auto_ptr<ShardChunkManager> p( new ShardChunkManager );
+
+ p->_key = this->_key;
+ p->_chunksMap = this->_chunksMap;
+ p->_chunksMap.insert( make_pair( min.getOwned() , max.getOwned() ) );
+ p->_version = version;
+ p->_fillRanges();
+
+ return p.release();
+ }
+
+ ShardChunkManager* ShardChunkManager::cloneSplit( const BSONObj& min , const BSONObj& max , const vector<BSONObj>& splitKeys ,
+ const ShardChunkVersion& version ) {
+
+ // the version required in both resulting chunks could be simply an increment in the minor portion of the current version
+ // however, we are enforcing uniqueness over the attributes <ns, lastmod> of the configdb collection 'chunks'
+ // so in practice, a migrate somewhere may force this split to pick up a version that has the major portion higher
+ // than the one that this shard has been using
+ //
+ // TODO drop the uniqueness constraint and tigthen the check below so that only the minor portion of version changes
+ if ( version <= _version ) {
+ uasserted( 13592 , str::stream() << "version " << version.toString() << " not greater than " << _version.toString() );
+ }
+
+ // check that we have the exact chunk that'll be split and that the split point is valid
+ _assertChunkExists( min , max );
+ for ( vector<BSONObj>::const_iterator it = splitKeys.begin() ; it != splitKeys.end() ; ++it ) {
+ if ( ! contains( min , max , *it ) ) {
+ uasserted( 13593 , str::stream() << "can split " << min << " -> " << max << " on " << *it );
+ }
+ }
+
+ auto_ptr<ShardChunkManager> p( new ShardChunkManager );
+
+ p->_key = this->_key;
+ p->_chunksMap = this->_chunksMap;
+ p->_version = version; // will increment second, third, ... chunks below
+
+ BSONObj startKey = min;
+ for ( vector<BSONObj>::const_iterator it = splitKeys.begin() ; it != splitKeys.end() ; ++it ) {
+ BSONObj split = *it;
+ p->_chunksMap[min] = split.getOwned();
+ p->_chunksMap.insert( make_pair( split.getOwned() , max.getOwned() ) );
+ p->_version.incMinor();
+ startKey = split;
+ }
+ p->_fillRanges();
+
+ return p.release();
+ }
+
+ string ShardChunkManager::toString() const {
+ StringBuilder ss;
+ ss << " ShardChunkManager version: " << _version << " key: " << _key;
+ bool first = true;
+ for ( RangeMap::const_iterator i=_rangesMap.begin(); i!=_rangesMap.end(); ++i ) {
+ if ( first ) first = false;
+ else ss << " , ";
+
+ ss << i->first << " -> " << i->second;
+ }
+ return ss.str();
+ }
+
+} // namespace mongo