summaryrefslogtreecommitdiff
path: root/s/strategy.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
committerAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
commit7645618fd3914cb8a20561625913c20d49504a49 (patch)
tree8370f846f58f6d71165b7a0e2eda04648584ec76 /s/strategy.cpp
parent68c73c3c7608b4c87f07440dc3232801720b1168 (diff)
downloadmongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz
Imported Upstream version 1.6.0
Diffstat (limited to 's/strategy.cpp')
-rw-r--r--s/strategy.cpp254
1 files changed, 172 insertions, 82 deletions
diff --git a/s/strategy.cpp b/s/strategy.cpp
index b7277e3..b3c8f5b 100644
--- a/s/strategy.cpp
+++ b/s/strategy.cpp
@@ -16,77 +16,88 @@
// stragegy.cpp
-#include "stdafx.h"
+#include "pch.h"
#include "request.h"
#include "../util/background.h"
#include "../client/connpool.h"
#include "../db/commands.h"
+
#include "server.h"
+#include "grid.h"
namespace mongo {
// ----- Strategy ------
- void Strategy::doWrite( int op , Request& r , string server ){
- ScopedDbConnection dbcon( server );
- DBClientBase &_c = dbcon.conn();
-
- /* TODO FIX - do not case and call DBClientBase::say() */
- DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
- c.port().say( r.m() );
-
- dbcon.done();
+ void Strategy::doWrite( int op , Request& r , const Shard& shard , bool checkVersion ){
+ ShardConnection conn( shard , r.getns() );
+ if ( ! checkVersion )
+ conn.donotCheckVersion();
+ else if ( conn.setVersion() ){
+ conn.done();
+ throw StaleConfigException( r.getns() , "doWRite" , true );
+ }
+ conn->say( r.m() );
+ conn.done();
}
-
- void Strategy::doQuery( Request& r , string server ){
+
+ void Strategy::doQuery( Request& r , const Shard& shard ){
try{
- ScopedDbConnection dbcon( server );
+ ShardConnection dbcon( shard , r.getns() );
DBClientBase &c = dbcon.conn();
- checkShardVersion( c , r.getns() );
-
Message response;
bool ok = c.call( r.m(), response);
{
- QueryResult *qr = (QueryResult *) response.data;
- if ( qr->resultFlags() & QueryResult::ResultFlag_ShardConfigStale ){
+ QueryResult *qr = (QueryResult *) response.singleData();
+ if ( qr->resultFlags() & ResultFlag_ShardConfigStale ){
dbcon.done();
throw StaleConfigException( r.getns() , "Strategy::doQuery" );
}
}
uassert( 10200 , "mongos: error calling db", ok);
- r.reply( response );
+ r.reply( response , c.getServerAddress() );
dbcon.done();
}
catch ( AssertionException& e ) {
BSONObjBuilder err;
- err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg));
+ e.getInfo().append( err );
BSONObj errObj = err.done();
- replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj);
+ replyToQuery(ResultFlag_ErrSet, r.p() , r.m() , errObj);
}
}
- void Strategy::insert( string server , const char * ns , const BSONObj& obj ){
- ScopedDbConnection dbcon( server );
- checkShardVersion( dbcon.conn() , ns );
+ void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj ){
+ ShardConnection dbcon( shard , ns );
+ if ( dbcon.setVersion() ){
+ dbcon.done();
+ throw StaleConfigException( ns , "for insert" );
+ }
dbcon->insert( ns , obj );
dbcon.done();
}
- map<DBClientBase*,unsigned long long> checkShardVersionLastSequence;
-
class WriteBackListener : public BackgroundJob {
protected:
-
+ string name() { return "WriteBackListener"; }
WriteBackListener( const string& addr ) : _addr( addr ){
- cout << "creating WriteBackListener for: " << addr << endl;
+ log() << "creating WriteBackListener for: " << addr << endl;
}
void run(){
+ OID lastID;
+ lastID.clear();
int secsToSleep = 0;
- while ( 1 ){
+ while ( Shard::isMember( _addr ) ){
+
+ if ( lastID.isSet() ){
+ scoped_lock lk( _seenWritebacksLock );
+ _seenWritebacks.insert( lastID );
+ lastID.clear();
+ }
+
try {
ScopedDbConnection conn( _addr );
@@ -94,7 +105,7 @@ namespace mongo {
{
BSONObjBuilder cmd;
- cmd.appendOID( "writebacklisten" , &serverID );
+ cmd.appendOID( "writebacklisten" , &serverID ); // Command will block for data
if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){
log() << "writebacklisten command failed! " << result << endl;
conn.done();
@@ -108,15 +119,35 @@ namespace mongo {
BSONObj data = result.getObjectField( "data" );
if ( data.getBoolField( "writeBack" ) ){
string ns = data["ns"].valuestrsafe();
-
+ {
+ BSONElement e = data["id"];
+ if ( e.type() == jstOID )
+ lastID = e.OID();
+ }
int len;
Message m( (void*)data["msg"].binData( len ) , false );
- massert( 10427 , "invalid writeback message" , m.data->valid() );
+ massert( 10427 , "invalid writeback message" , m.header()->valid() );
- grid.getDBConfig( ns )->getChunkManager( ns , true );
+ DBConfigPtr db = grid.getDBConfig( ns );
+ ShardChunkVersion needVersion( data["version"] );
+
+ log(1) << "writeback id: " << lastID << " needVersion : " << needVersion.toString()
+ << " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3)
+
+ if ( logLevel ) log(1) << debugString( m ) << endl;
+
+ if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ){
+ // this means when the write went originally, the version was old
+ // if we're here, it means we've already updated the config, so don't need to do again
+ //db->getChunkManager( ns , true ); // SERVER-1349
+ }
+ else {
+ db->getChunkManager( ns , true );
+ }
Request r( m , 0 );
+ r.init();
r.process();
}
else {
@@ -125,9 +156,13 @@ namespace mongo {
conn.done();
secsToSleep = 0;
+ continue;
}
catch ( std::exception e ){
log() << "WriteBackListener exception : " << e.what() << endl;
+
+ // It's possible this shard was removed
+ Shard::reloadShardInfo();
}
catch ( ... ){
log() << "WriteBackListener uncaught exception!" << endl;
@@ -137,99 +172,154 @@ namespace mongo {
if ( secsToSleep > 10 )
secsToSleep = 0;
}
+
+ log() << "WriteBackListener exiting : address no longer in cluster " << _addr;
+
}
private:
string _addr;
- static map<string,WriteBackListener*> _cache;
+ static map<string,WriteBackListener*> _cache;
+ static mongo::mutex _cacheLock;
+
+ static set<OID> _seenWritebacks;
+ static mongo::mutex _seenWritebacksLock;
+
public:
static void init( DBClientBase& conn ){
+ scoped_lock lk( _cacheLock );
WriteBackListener*& l = _cache[conn.getServerAddress()];
if ( l )
return;
l = new WriteBackListener( conn.getServerAddress() );
l->go();
}
+
+ static void waitFor( const OID& oid ){
+ Timer t;
+ for ( int i=0; i<5000; i++ ){
+ {
+ scoped_lock lk( _seenWritebacksLock );
+ if ( _seenWritebacks.count( oid ) )
+ return;
+ }
+ sleepmillis( 10 );
+ }
+ stringstream ss;
+ ss << "didn't get writeback for: " << oid << " after: " << t.millis() << " ms";
+ uasserted( 13403 , ss.str() );
+ }
};
- map<string,WriteBackListener*> WriteBackListener::_cache;
+ void waitForWriteback( const OID& oid ){
+ WriteBackListener::waitFor( oid );
+ }
+ map<string,WriteBackListener*> WriteBackListener::_cache;
+ mongo::mutex WriteBackListener::_cacheLock("WriteBackListener");
+
+ set<OID> WriteBackListener::_seenWritebacks;
+ mongo::mutex WriteBackListener::_seenWritebacksLock( "WriteBackListener::seen" );
- void checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative ){
+ struct ConnectionShardStatus {
+
+ typedef unsigned long long S;
+
+ ConnectionShardStatus()
+ : _mutex( "ConnectionShardStatus" ){
+ }
+
+ S getSequence( DBClientBase * conn , const string& ns ){
+ scoped_lock lk( _mutex );
+ return _map[conn][ns];
+ }
+
+ void setSequence( DBClientBase * conn , const string& ns , const S& s ){
+ scoped_lock lk( _mutex );
+ _map[conn][ns] = s;
+ }
+
+ void reset( DBClientBase * conn ){
+ scoped_lock lk( _mutex );
+ _map.erase( conn );
+ }
+
+ map<DBClientBase*, map<string,unsigned long long> > _map;
+ mongo::mutex _mutex;
+ } connectionShardStatus;
+
+ void resetShardVersion( DBClientBase * conn ){
+ connectionShardStatus.reset( conn );
+ }
+
+ /**
+ * @return true if had to do something
+ */
+ bool checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative , int tryNumber ){
// TODO: cache, optimize, etc...
WriteBackListener::init( conn );
- DBConfig * conf = grid.getDBConfig( ns );
+ DBConfigPtr conf = grid.getDBConfig( ns );
if ( ! conf )
- return;
+ return false;
- ShardChunkVersion version = 0;
unsigned long long officialSequenceNumber = 0;
-
- if ( conf->isSharded( ns ) ){
- ChunkManager * manager = conf->getChunkManager( ns , authoritative );
+
+ ChunkManagerPtr manager;
+ const bool isSharded = conf->isSharded( ns );
+ if ( isSharded ){
+ manager = conf->getChunkManager( ns , authoritative );
officialSequenceNumber = manager->getSequenceNumber();
- version = manager->getVersion( conn.getServerAddress() );
}
- unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ];
- if ( officialSequenceNumber == sequenceNumber )
- return;
-
- log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber << endl;
+ unsigned long long sequenceNumber = connectionShardStatus.getSequence(&conn,ns);
+ if ( sequenceNumber == officialSequenceNumber ){
+ return false;
+ }
+
+ ShardChunkVersion version = 0;
+ if ( isSharded ){
+ version = manager->getVersion( Shard::make( conn.getServerAddress() ) );
+ }
+
+ log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns
+ << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber
+ << " version: " << version << " manager: " << manager.get()
+ << endl;
+
BSONObj result;
if ( setShardVersion( conn , ns , version , authoritative , result ) ){
// success!
log(1) << " setShardVersion success!" << endl;
- sequenceNumber = officialSequenceNumber;
- return;
+ connectionShardStatus.setSequence( &conn , ns , officialSequenceNumber );
+ return true;
}
-
+
log(1) << " setShardVersion failed!\n" << result << endl;
if ( result.getBoolField( "need_authoritative" ) )
massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative );
if ( ! authoritative ){
- checkShardVersion( conn , ns , 1 );
- return;
+ checkShardVersion( conn , ns , 1 , tryNumber + 1 );
+ return true;
}
- log(1) << " setShardVersion failed: " << result << endl;
- massert( 10429 , "setShardVersion failed!" , 0 );
- }
-
- bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){
-
- BSONObjBuilder cmdBuilder;
- cmdBuilder.append( "setShardVersion" , ns.c_str() );
- cmdBuilder.append( "configdb" , configServer.modelServer() );
- cmdBuilder.appendTimestamp( "version" , version );
- cmdBuilder.appendOID( "serverID" , &serverID );
- if ( authoritative )
- cmdBuilder.appendBool( "authoritative" , 1 );
- BSONObj cmd = cmdBuilder.obj();
-
- log(1) << " setShardVersion " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl;
-
- return conn.runCommand( "admin" , cmd , result );
- }
-
- bool lockNamespaceOnServer( const string& server , const string& ns ){
- ScopedDbConnection conn( server );
- bool res = lockNamespaceOnServer( conn.conn() , ns );
- conn.done();
- return res;
- }
+ if ( tryNumber < 4 ){
+ log(1) << "going to retry checkShardVersion" << endl;
+ sleepmillis( 10 );
+ checkShardVersion( conn , ns , 1 , tryNumber + 1 );
+ return true;
+ }
- bool lockNamespaceOnServer( DBClientBase& conn , const string& ns ){
- BSONObj lockResult;
- return setShardVersion( conn , ns , grid.getNextOpTime() , true , lockResult );
+ log() << " setShardVersion failed: " << result << endl;
+ massert( 10429 , (string)"setShardVersion failed! " + result.jsonString() , 0 );
+ return true;
}
-
+
}