/*
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
// strategy_sharded.cpp
#include "pch.h"
#include "request.h"
#include "chunk.h"
#include "cursors.h"
#include "stats.h"
#include "client.h"
#include "../client/connpool.h"
#include "../db/commands.h"
// error codes 8010-8040
namespace mongo {
class ShardStrategy : public Strategy {
virtual void queryOp( Request& r ) {
QueryMessage q( r.d() );
log(3) << "shard query: " << q.ns << " " << q.query << endl;
if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") )
throw UserException( 8010 , "something is wrong, shouldn't see a command here" );
ChunkManagerPtr info = r.getChunkManager();
assert( info );
Query query( q.query );
set shards;
info->getShardsForQuery( shards , query.getFilter() );
set servers;
for ( set::iterator i = shards.begin(); i != shards.end(); i++ ) {
servers.insert( ServerAndQuery( i->getConnString() , BSONObj() ) );
}
if ( logLevel > 4 ) {
StringBuilder ss;
ss << " shard query servers: " << servers.size() << '\n';
for ( set::iterator i = servers.begin(); i!=servers.end(); i++ ) {
const ServerAndQuery& s = *i;
ss << " " << s.toString() << '\n';
}
log() << ss.str() << endl;
}
ClusteredCursor * cursor = 0;
BSONObj sort = query.getSort();
if ( sort.isEmpty() ) {
cursor = new SerialServerClusteredCursor( servers , q );
}
else {
cursor = new ParallelSortClusteredCursor( servers , q , sort );
}
assert( cursor );
try {
cursor->init();
log(5) << " cursor type: " << cursor->type() << endl;
shardedCursorTypes.hit( cursor->type() );
if ( query.isExplain() ) {
BSONObj explain = cursor->explain();
replyToQuery( 0 , r.p() , r.m() , explain );
delete( cursor );
return;
}
}
catch(...) {
delete cursor;
throw;
}
ShardedClientCursorPtr cc (new ShardedClientCursor( q , cursor ));
if ( ! cc->sendNextBatch( r ) ) {
return;
}
log(6) << "storing cursor : " << cc->getId() << endl;
cursorCache.store( cc );
}
virtual void getMore( Request& r ) {
int ntoreturn = r.d().pullInt();
long long id = r.d().pullInt64();
log(6) << "want cursor : " << id << endl;
ShardedClientCursorPtr cursor = cursorCache.get( id );
if ( ! cursor ) {
log(6) << "\t invalid cursor :(" << endl;
replyToQuery( ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 );
return;
}
if ( cursor->sendNextBatch( r , ntoreturn ) ) {
// still more data
cursor->accessed();
return;
}
// we've exhausted the cursor
cursorCache.remove( id );
}
void _insert( Request& r , DbMessage& d, ChunkManagerPtr manager ) {
while ( d.moreJSObjs() ) {
BSONObj o = d.nextJsObj();
if ( ! manager->hasShardKey( o ) ) {
bool bad = true;
if ( manager->getShardKey().partOfShardKey( "_id" ) ) {
BSONObjBuilder b;
b.appendOID( "_id" , 0 , true );
b.appendElements( o );
o = b.obj();
bad = ! manager->hasShardKey( o );
}
if ( bad ) {
log() << "tried to insert object without shard key: " << r.getns() << " " << o << endl;
throw UserException( 8011 , "tried to insert object without shard key" );
}
}
// Many operations benefit from having the shard key early in the object
o = manager->getShardKey().moveToFront(o);
const int maxTries = 10;
bool gotThrough = false;
for ( int i=0; ifindChunk( o );
log(4) << " server:" << c->getShard().toString() << " " << o << endl;
insert( c->getShard() , r.getns() , o );
r.gotInsert();
if ( r.getClientInfo()->autoSplitOk() )
c->splitIfShould( o.objsize() );
gotThrough = true;
break;
}
catch ( StaleConfigException& ) {
log( i < ( maxTries / 2 ) ) << "retrying insert because of StaleConfigException: " << o << endl;
r.reset();
manager = r.getChunkManager();
}
sleepmillis( i * 200 );
}
assert( gotThrough );
}
}
void _update( Request& r , DbMessage& d, ChunkManagerPtr manager ) {
int flags = d.pullInt();
BSONObj query = d.nextJsObj();
uassert( 13506 , "$atomic not supported sharded" , query["$atomic"].eoo() );
uassert( 10201 , "invalid update" , d.moreJSObjs() );
BSONObj toupdate = d.nextJsObj();
BSONObj chunkFinder = query;
bool upsert = flags & UpdateOption_Upsert;
bool multi = flags & UpdateOption_Multi;
if (upsert) {
uassert(8012, "can't upsert something without shard key",
(manager->hasShardKey(toupdate) ||
(toupdate.firstElement().fieldName()[0] == '$' && manager->hasShardKey(query))));
BSONObj key = manager->getShardKey().extractKey(query);
BSONForEach(e, key) {
uassert(13465, "shard key in upsert query must be an exact match", getGtLtOp(e) == BSONObj::Equality);
}
}
bool save = false;
if ( ! manager->hasShardKey( query ) ) {
if ( multi ) {
}
else if ( strcmp( query.firstElement().fieldName() , "_id" ) || query.nFields() != 1 ) {
throw UserException( 8013 , "can't do non-multi update with query that doesn't have the shard key" );
}
else {
save = true;
chunkFinder = toupdate;
}
}
if ( ! save ) {
if ( toupdate.firstElement().fieldName()[0] == '$' ) {
BSONObjIterator ops(toupdate);
while(ops.more()) {
BSONElement op(ops.next());
if (op.type() != Object)
continue;
BSONObjIterator fields(op.embeddedObject());
while(fields.more()) {
const string field = fields.next().fieldName();
uassert(13123,
str::stream() << "Can't modify shard key's value field" << field
<< " for collection: " << manager->getns(),
! manager->getShardKey().partOfShardKey(field));
}
}
}
else if ( manager->hasShardKey( toupdate ) ) {
uassert( 8014,
str::stream() << "cannot modify shard key for collection: " << manager->getns(),
manager->getShardKey().compare( query , toupdate ) == 0 );
}
else {
uasserted(12376,
str::stream() << "shard key must be in update object for collection: " << manager->getns() );
}
}
if ( multi ) {
set shards;
manager->getShardsForQuery( shards , chunkFinder );
int * x = (int*)(r.d().afterNS());
x[0] |= UpdateOption_Broadcast;
for ( set::iterator i=shards.begin(); i!=shards.end(); i++) {
doWrite( dbUpdate , r , *i , false );
}
}
else {
int left = 5;
while ( true ) {
try {
ChunkPtr c = manager->findChunk( chunkFinder );
doWrite( dbUpdate , r , c->getShard() );
if ( r.getClientInfo()->autoSplitOk() )
c->splitIfShould( d.msg().header()->dataLen() );
break;
}
catch ( StaleConfigException& e ) {
if ( left <= 0 )
throw e;
left--;
log() << "update failed b/c of StaleConfigException, retrying "
<< " left:" << left << " ns: " << r.getns() << " query: " << query << endl;
r.reset( false );
manager = r.getChunkManager();
}
}
}
}
void _delete( Request& r , DbMessage& d, ChunkManagerPtr manager ) {
int flags = d.pullInt();
bool justOne = flags & 1;
uassert( 10203 , "bad delete message" , d.moreJSObjs() );
BSONObj pattern = d.nextJsObj();
uassert( 13505 , "$atomic not supported sharded" , pattern["$atomic"].eoo() );
set shards;
int left = 5;
while ( true ) {
try {
manager->getShardsForQuery( shards , pattern );
log(2) << "delete : " << pattern << " \t " << shards.size() << " justOne: " << justOne << endl;
if ( shards.size() == 1 ) {
doWrite( dbDelete , r , *shards.begin() );
return;
}
break;
}
catch ( StaleConfigException& e ) {
if ( left <= 0 )
throw e;
left--;
log() << "delete failed b/c of StaleConfigException, retrying "
<< " left:" << left << " ns: " << r.getns() << " patt: " << pattern << endl;
r.reset( false );
shards.clear();
manager = r.getChunkManager();
}
}
if ( justOne && ! pattern.hasField( "_id" ) )
throw UserException( 8015 , "can only delete with a non-shard key pattern if can delete as many as we find" );
for ( set::iterator i=shards.begin(); i!=shards.end(); i++) {
int * x = (int*)(r.d().afterNS());
x[0] |= RemoveOption_Broadcast;
doWrite( dbDelete , r , *i , false );
}
}
virtual void writeOp( int op , Request& r ) {
const char *ns = r.getns();
log(3) << "write: " << ns << endl;
DbMessage& d = r.d();
ChunkManagerPtr info = r.getChunkManager();
assert( info );
if ( op == dbInsert ) {
_insert( r , d , info );
}
else if ( op == dbUpdate ) {
_update( r , d , info );
}
else if ( op == dbDelete ) {
_delete( r , d , info );
}
else {
log() << "sharding can't do write op: " << op << endl;
throw UserException( 8016 , "can't do this write op on sharded collection" );
}
}
};
Strategy * SHARDED = new ShardStrategy();
}