diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-01-31 08:32:52 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-01-31 08:32:52 +0100 |
commit | 4eefaf421bfeddf040d96a3dafb12e09673423d7 (patch) | |
tree | cb2e5ccc7f98158894f977ff131949da36673591 /s/strategy_shard.cpp | |
download | mongodb-4eefaf421bfeddf040d96a3dafb12e09673423d7.tar.gz |
Imported Upstream version 1.3.1
Diffstat (limited to 's/strategy_shard.cpp')
-rw-r--r-- | s/strategy_shard.cpp | 260 |
1 files changed, 260 insertions, 0 deletions
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp new file mode 100644 index 0000000..34cf226 --- /dev/null +++ b/s/strategy_shard.cpp @@ -0,0 +1,260 @@ +// strategy_sharded.cpp + +#include "stdafx.h" +#include "request.h" +#include "chunk.h" +#include "cursors.h" +#include "../client/connpool.h" +#include "../db/commands.h" + +// error codes 8010-8040 + +namespace mongo { + + class ShardStrategy : public Strategy { + + virtual void queryOp( Request& r ){ + QueryMessage q( r.d() ); + + log(3) << "shard query: " << q.ns << " " << q.query << endl; + + if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") ) + throw UserException( 8010 , "something is wrong, shouldn't see a command here" ); + + ChunkManager * info = r.getChunkManager(); + assert( info ); + + Query query( q.query ); + + vector<Chunk*> shards; + info->getChunksForQuery( shards , query.getFilter() ); + + set<ServerAndQuery> servers; + map<string,int> serverCounts; + for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){ + servers.insert( ServerAndQuery( (*i)->getShard() , (*i)->getFilter() ) ); + int& num = serverCounts[(*i)->getShard()]; + num++; + } + + if ( logLevel > 4 ){ + StringBuilder ss; + ss << " shard query servers: " << servers.size() << "\n"; + for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){ + const ServerAndQuery& s = *i; + ss << " " << s.toString() << "\n"; + } + log() << ss.str(); + } + + ClusteredCursor * cursor = 0; + + BSONObj sort = query.getSort(); + + if ( sort.isEmpty() ){ + // 1. no sort, can just hit them in serial + cursor = new SerialServerClusteredCursor( servers , q ); + } + else { + int shardKeyOrder = info->getShardKey().canOrder( sort ); + if ( shardKeyOrder ){ + // 2. sort on shard key, can do in serial intelligently + set<ServerAndQuery> buckets; + for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){ + Chunk * s = *i; + buckets.insert( ServerAndQuery( s->getShard() , s->getFilter() , s->getMin() ) ); + } + cursor = new SerialServerClusteredCursor( buckets , q , shardKeyOrder ); + } + else { + // 3. sort on non-sharded key, pull back a portion from each server and iterate slowly + cursor = new ParallelSortClusteredCursor( servers , q , sort ); + } + } + + assert( cursor ); + + log(5) << " cursor type: " << cursor->type() << endl; + + ShardedClientCursor * cc = new ShardedClientCursor( q , cursor ); + if ( ! cc->sendNextBatch( r ) ){ + delete( cursor ); + return; + } + log(6) << "storing cursor : " << cc->getId() << endl; + cursorCache.store( cc ); + } + + virtual void getMore( Request& r ){ + int ntoreturn = r.d().pullInt(); + long long id = r.d().pullInt64(); + + log(6) << "want cursor : " << id << endl; + + ShardedClientCursor * cursor = cursorCache.get( id ); + if ( ! cursor ){ + log(6) << "\t invalid cursor :(" << endl; + replyToQuery( QueryResult::ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 ); + return; + } + + if ( cursor->sendNextBatch( r , ntoreturn ) ){ + log(6) << "\t cursor finished: " << id << endl; + return; + } + + delete( cursor ); + cursorCache.remove( id ); + } + + void _insert( Request& r , DbMessage& d, ChunkManager* manager ){ + + while ( d.moreJSObjs() ){ + BSONObj o = d.nextJsObj(); + if ( ! manager->hasShardKey( o ) ){ + + bool bad = true; + + if ( manager->getShardKey().partOfShardKey( "_id" ) ){ + BSONObjBuilder b; + b.appendOID( "_id" , 0 , true ); + b.appendElements( o ); + o = b.obj(); + bad = ! manager->hasShardKey( o ); + } + + if ( bad ){ + log() << "tried to insert object without shard key: " << r.getns() << " " << o << endl; + throw UserException( 8011 , "tried to insert object without shard key" ); + } + + } + + Chunk& c = manager->findChunk( o ); + log(4) << " server:" << c.getShard() << " " << o << endl; + insert( c.getShard() , r.getns() , o ); + + c.splitIfShould( o.objsize() ); + } + } + + void _update( Request& r , DbMessage& d, ChunkManager* manager ){ + int flags = d.pullInt(); + + BSONObj query = d.nextJsObj(); + uassert( 10201 , "invalid update" , d.moreJSObjs() ); + BSONObj toupdate = d.nextJsObj(); + + BSONObj chunkFinder = query; + + bool upsert = flags & UpdateOption_Upsert; + bool multi = flags & UpdateOption_Multi; + + if ( multi ) + uassert( 10202 , "can't mix multi and upsert and sharding" , ! upsert ); + + if ( upsert && !(manager->hasShardKey(toupdate) || + (toupdate.firstElement().fieldName()[0] == '$' && manager->hasShardKey(query)))) + { + throw UserException( 8012 , "can't upsert something without shard key" ); + } + + bool save = false; + if ( ! manager->hasShardKey( query ) ){ + if ( multi ){ + } + else if ( query.nFields() != 1 || strcmp( query.firstElement().fieldName() , "_id" ) ){ + throw UserException( 8013 , "can't do update with query that doesn't have the shard key" ); + } + else { + save = true; + chunkFinder = toupdate; + } + } + + + if ( ! save ){ + if ( toupdate.firstElement().fieldName()[0] == '$' ){ + // TODO: check for $set, etc.. on shard key + } + else if ( manager->hasShardKey( toupdate ) && manager->getShardKey().compare( query , toupdate ) ){ + throw UserException( 8014 , "change would move shards!" ); + } + } + + if ( multi ){ + vector<Chunk*> chunks; + manager->getChunksForQuery( chunks , chunkFinder ); + set<string> seen; + for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){ + Chunk * c = *i; + if ( seen.count( c->getShard() ) ) + continue; + doWrite( dbUpdate , r , c->getShard() ); + seen.insert( c->getShard() ); + } + } + else { + Chunk& c = manager->findChunk( chunkFinder ); + doWrite( dbUpdate , r , c.getShard() ); + c.splitIfShould( d.msg().data->dataLen() ); + } + + } + + void _delete( Request& r , DbMessage& d, ChunkManager* manager ){ + + int flags = d.pullInt(); + bool justOne = flags & 1; + + uassert( 10203 , "bad delete message" , d.moreJSObjs() ); + BSONObj pattern = d.nextJsObj(); + + vector<Chunk*> chunks; + manager->getChunksForQuery( chunks , pattern ); + cout << "delete : " << pattern << " \t " << chunks.size() << " justOne: " << justOne << endl; + if ( chunks.size() == 1 ){ + doWrite( dbDelete , r , chunks[0]->getShard() ); + return; + } + + if ( justOne && ! pattern.hasField( "_id" ) ) + throw UserException( 8015 , "can only delete with a non-shard key pattern if can delete as many as we find" ); + + set<string> seen; + for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){ + Chunk * c = *i; + if ( seen.count( c->getShard() ) ) + continue; + seen.insert( c->getShard() ); + doWrite( dbDelete , r , c->getShard() ); + } + } + + virtual void writeOp( int op , Request& r ){ + const char *ns = r.getns(); + log(3) << "write: " << ns << endl; + + DbMessage& d = r.d(); + ChunkManager * info = r.getChunkManager(); + assert( info ); + + if ( op == dbInsert ){ + _insert( r , d , info ); + } + else if ( op == dbUpdate ){ + _update( r , d , info ); + } + else if ( op == dbDelete ){ + _delete( r , d , info ); + } + else { + log() << "sharding can't do write op: " << op << endl; + throw UserException( 8016 , "can't do this write op on sharded collection" ); + } + + } + }; + + Strategy * SHARDED = new ShardStrategy(); +} |