summaryrefslogtreecommitdiff
path: root/s/strategy_shard.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-01-31 08:32:52 +0100
committerAntonin Kral <a.kral@bobek.cz>2010-01-31 08:32:52 +0100
commit4eefaf421bfeddf040d96a3dafb12e09673423d7 (patch)
treecb2e5ccc7f98158894f977ff131949da36673591 /s/strategy_shard.cpp
downloadmongodb-4eefaf421bfeddf040d96a3dafb12e09673423d7.tar.gz
Imported Upstream version 1.3.1
Diffstat (limited to 's/strategy_shard.cpp')
-rw-r--r--s/strategy_shard.cpp260
1 files changed, 260 insertions, 0 deletions
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp
new file mode 100644
index 0000000..34cf226
--- /dev/null
+++ b/s/strategy_shard.cpp
@@ -0,0 +1,260 @@
+// strategy_sharded.cpp
+
+#include "stdafx.h"
+#include "request.h"
+#include "chunk.h"
+#include "cursors.h"
+#include "../client/connpool.h"
+#include "../db/commands.h"
+
+// error codes 8010-8040
+
+namespace mongo {
+
+ class ShardStrategy : public Strategy {
+
+ virtual void queryOp( Request& r ){
+ QueryMessage q( r.d() );
+
+ log(3) << "shard query: " << q.ns << " " << q.query << endl;
+
+ if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") )
+ throw UserException( 8010 , "something is wrong, shouldn't see a command here" );
+
+ ChunkManager * info = r.getChunkManager();
+ assert( info );
+
+ Query query( q.query );
+
+ vector<Chunk*> shards;
+ info->getChunksForQuery( shards , query.getFilter() );
+
+ set<ServerAndQuery> servers;
+ map<string,int> serverCounts;
+ for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){
+ servers.insert( ServerAndQuery( (*i)->getShard() , (*i)->getFilter() ) );
+ int& num = serverCounts[(*i)->getShard()];
+ num++;
+ }
+
+ if ( logLevel > 4 ){
+ StringBuilder ss;
+ ss << " shard query servers: " << servers.size() << "\n";
+ for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){
+ const ServerAndQuery& s = *i;
+ ss << " " << s.toString() << "\n";
+ }
+ log() << ss.str();
+ }
+
+ ClusteredCursor * cursor = 0;
+
+ BSONObj sort = query.getSort();
+
+ if ( sort.isEmpty() ){
+ // 1. no sort, can just hit them in serial
+ cursor = new SerialServerClusteredCursor( servers , q );
+ }
+ else {
+ int shardKeyOrder = info->getShardKey().canOrder( sort );
+ if ( shardKeyOrder ){
+ // 2. sort on shard key, can do in serial intelligently
+ set<ServerAndQuery> buckets;
+ for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){
+ Chunk * s = *i;
+ buckets.insert( ServerAndQuery( s->getShard() , s->getFilter() , s->getMin() ) );
+ }
+ cursor = new SerialServerClusteredCursor( buckets , q , shardKeyOrder );
+ }
+ else {
+ // 3. sort on non-sharded key, pull back a portion from each server and iterate slowly
+ cursor = new ParallelSortClusteredCursor( servers , q , sort );
+ }
+ }
+
+ assert( cursor );
+
+ log(5) << " cursor type: " << cursor->type() << endl;
+
+ ShardedClientCursor * cc = new ShardedClientCursor( q , cursor );
+ if ( ! cc->sendNextBatch( r ) ){
+ delete( cursor );
+ return;
+ }
+ log(6) << "storing cursor : " << cc->getId() << endl;
+ cursorCache.store( cc );
+ }
+
+ virtual void getMore( Request& r ){
+ int ntoreturn = r.d().pullInt();
+ long long id = r.d().pullInt64();
+
+ log(6) << "want cursor : " << id << endl;
+
+ ShardedClientCursor * cursor = cursorCache.get( id );
+ if ( ! cursor ){
+ log(6) << "\t invalid cursor :(" << endl;
+ replyToQuery( QueryResult::ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 );
+ return;
+ }
+
+ if ( cursor->sendNextBatch( r , ntoreturn ) ){
+ log(6) << "\t cursor finished: " << id << endl;
+ return;
+ }
+
+ delete( cursor );
+ cursorCache.remove( id );
+ }
+
+ void _insert( Request& r , DbMessage& d, ChunkManager* manager ){
+
+ while ( d.moreJSObjs() ){
+ BSONObj o = d.nextJsObj();
+ if ( ! manager->hasShardKey( o ) ){
+
+ bool bad = true;
+
+ if ( manager->getShardKey().partOfShardKey( "_id" ) ){
+ BSONObjBuilder b;
+ b.appendOID( "_id" , 0 , true );
+ b.appendElements( o );
+ o = b.obj();
+ bad = ! manager->hasShardKey( o );
+ }
+
+ if ( bad ){
+ log() << "tried to insert object without shard key: " << r.getns() << " " << o << endl;
+ throw UserException( 8011 , "tried to insert object without shard key" );
+ }
+
+ }
+
+ Chunk& c = manager->findChunk( o );
+ log(4) << " server:" << c.getShard() << " " << o << endl;
+ insert( c.getShard() , r.getns() , o );
+
+ c.splitIfShould( o.objsize() );
+ }
+ }
+
+ void _update( Request& r , DbMessage& d, ChunkManager* manager ){
+ int flags = d.pullInt();
+
+ BSONObj query = d.nextJsObj();
+ uassert( 10201 , "invalid update" , d.moreJSObjs() );
+ BSONObj toupdate = d.nextJsObj();
+
+ BSONObj chunkFinder = query;
+
+ bool upsert = flags & UpdateOption_Upsert;
+ bool multi = flags & UpdateOption_Multi;
+
+ if ( multi )
+ uassert( 10202 , "can't mix multi and upsert and sharding" , ! upsert );
+
+ if ( upsert && !(manager->hasShardKey(toupdate) ||
+ (toupdate.firstElement().fieldName()[0] == '$' && manager->hasShardKey(query))))
+ {
+ throw UserException( 8012 , "can't upsert something without shard key" );
+ }
+
+ bool save = false;
+ if ( ! manager->hasShardKey( query ) ){
+ if ( multi ){
+ }
+ else if ( query.nFields() != 1 || strcmp( query.firstElement().fieldName() , "_id" ) ){
+ throw UserException( 8013 , "can't do update with query that doesn't have the shard key" );
+ }
+ else {
+ save = true;
+ chunkFinder = toupdate;
+ }
+ }
+
+
+ if ( ! save ){
+ if ( toupdate.firstElement().fieldName()[0] == '$' ){
+ // TODO: check for $set, etc.. on shard key
+ }
+ else if ( manager->hasShardKey( toupdate ) && manager->getShardKey().compare( query , toupdate ) ){
+ throw UserException( 8014 , "change would move shards!" );
+ }
+ }
+
+ if ( multi ){
+ vector<Chunk*> chunks;
+ manager->getChunksForQuery( chunks , chunkFinder );
+ set<string> seen;
+ for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){
+ Chunk * c = *i;
+ if ( seen.count( c->getShard() ) )
+ continue;
+ doWrite( dbUpdate , r , c->getShard() );
+ seen.insert( c->getShard() );
+ }
+ }
+ else {
+ Chunk& c = manager->findChunk( chunkFinder );
+ doWrite( dbUpdate , r , c.getShard() );
+ c.splitIfShould( d.msg().data->dataLen() );
+ }
+
+ }
+
+ void _delete( Request& r , DbMessage& d, ChunkManager* manager ){
+
+ int flags = d.pullInt();
+ bool justOne = flags & 1;
+
+ uassert( 10203 , "bad delete message" , d.moreJSObjs() );
+ BSONObj pattern = d.nextJsObj();
+
+ vector<Chunk*> chunks;
+ manager->getChunksForQuery( chunks , pattern );
+ cout << "delete : " << pattern << " \t " << chunks.size() << " justOne: " << justOne << endl;
+ if ( chunks.size() == 1 ){
+ doWrite( dbDelete , r , chunks[0]->getShard() );
+ return;
+ }
+
+ if ( justOne && ! pattern.hasField( "_id" ) )
+ throw UserException( 8015 , "can only delete with a non-shard key pattern if can delete as many as we find" );
+
+ set<string> seen;
+ for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){
+ Chunk * c = *i;
+ if ( seen.count( c->getShard() ) )
+ continue;
+ seen.insert( c->getShard() );
+ doWrite( dbDelete , r , c->getShard() );
+ }
+ }
+
+ virtual void writeOp( int op , Request& r ){
+ const char *ns = r.getns();
+ log(3) << "write: " << ns << endl;
+
+ DbMessage& d = r.d();
+ ChunkManager * info = r.getChunkManager();
+ assert( info );
+
+ if ( op == dbInsert ){
+ _insert( r , d , info );
+ }
+ else if ( op == dbUpdate ){
+ _update( r , d , info );
+ }
+ else if ( op == dbDelete ){
+ _delete( r , d , info );
+ }
+ else {
+ log() << "sharding can't do write op: " << op << endl;
+ throw UserException( 8016 , "can't do this write op on sharded collection" );
+ }
+
+ }
+ };
+
+ Strategy * SHARDED = new ShardStrategy();
+}