diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
commit | 7645618fd3914cb8a20561625913c20d49504a49 (patch) | |
tree | 8370f846f58f6d71165b7a0e2eda04648584ec76 /s/strategy.cpp | |
parent | 68c73c3c7608b4c87f07440dc3232801720b1168 (diff) | |
download | mongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz |
Imported Upstream version 1.6.0
Diffstat (limited to 's/strategy.cpp')
-rw-r--r-- | s/strategy.cpp | 254 |
1 files changed, 172 insertions, 82 deletions
diff --git a/s/strategy.cpp b/s/strategy.cpp index b7277e3..b3c8f5b 100644 --- a/s/strategy.cpp +++ b/s/strategy.cpp @@ -16,77 +16,88 @@ // stragegy.cpp -#include "stdafx.h" +#include "pch.h" #include "request.h" #include "../util/background.h" #include "../client/connpool.h" #include "../db/commands.h" + #include "server.h" +#include "grid.h" namespace mongo { // ----- Strategy ------ - void Strategy::doWrite( int op , Request& r , string server ){ - ScopedDbConnection dbcon( server ); - DBClientBase &_c = dbcon.conn(); - - /* TODO FIX - do not case and call DBClientBase::say() */ - DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c); - c.port().say( r.m() ); - - dbcon.done(); + void Strategy::doWrite( int op , Request& r , const Shard& shard , bool checkVersion ){ + ShardConnection conn( shard , r.getns() ); + if ( ! checkVersion ) + conn.donotCheckVersion(); + else if ( conn.setVersion() ){ + conn.done(); + throw StaleConfigException( r.getns() , "doWRite" , true ); + } + conn->say( r.m() ); + conn.done(); } - - void Strategy::doQuery( Request& r , string server ){ + + void Strategy::doQuery( Request& r , const Shard& shard ){ try{ - ScopedDbConnection dbcon( server ); + ShardConnection dbcon( shard , r.getns() ); DBClientBase &c = dbcon.conn(); - checkShardVersion( c , r.getns() ); - Message response; bool ok = c.call( r.m(), response); { - QueryResult *qr = (QueryResult *) response.data; - if ( qr->resultFlags() & QueryResult::ResultFlag_ShardConfigStale ){ + QueryResult *qr = (QueryResult *) response.singleData(); + if ( qr->resultFlags() & ResultFlag_ShardConfigStale ){ dbcon.done(); throw StaleConfigException( r.getns() , "Strategy::doQuery" ); } } uassert( 10200 , "mongos: error calling db", ok); - r.reply( response ); + r.reply( response , c.getServerAddress() ); dbcon.done(); } catch ( AssertionException& e ) { BSONObjBuilder err; - err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg)); + e.getInfo().append( err ); BSONObj errObj = err.done(); - replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj); + replyToQuery(ResultFlag_ErrSet, r.p() , r.m() , errObj); } } - void Strategy::insert( string server , const char * ns , const BSONObj& obj ){ - ScopedDbConnection dbcon( server ); - checkShardVersion( dbcon.conn() , ns ); + void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj ){ + ShardConnection dbcon( shard , ns ); + if ( dbcon.setVersion() ){ + dbcon.done(); + throw StaleConfigException( ns , "for insert" ); + } dbcon->insert( ns , obj ); dbcon.done(); } - map<DBClientBase*,unsigned long long> checkShardVersionLastSequence; - class WriteBackListener : public BackgroundJob { protected: - + string name() { return "WriteBackListener"; } WriteBackListener( const string& addr ) : _addr( addr ){ - cout << "creating WriteBackListener for: " << addr << endl; + log() << "creating WriteBackListener for: " << addr << endl; } void run(){ + OID lastID; + lastID.clear(); int secsToSleep = 0; - while ( 1 ){ + while ( Shard::isMember( _addr ) ){ + + if ( lastID.isSet() ){ + scoped_lock lk( _seenWritebacksLock ); + _seenWritebacks.insert( lastID ); + lastID.clear(); + } + try { ScopedDbConnection conn( _addr ); @@ -94,7 +105,7 @@ namespace mongo { { BSONObjBuilder cmd; - cmd.appendOID( "writebacklisten" , &serverID ); + cmd.appendOID( "writebacklisten" , &serverID ); // Command will block for data if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){ log() << "writebacklisten command failed! " << result << endl; conn.done(); @@ -108,15 +119,35 @@ namespace mongo { BSONObj data = result.getObjectField( "data" ); if ( data.getBoolField( "writeBack" ) ){ string ns = data["ns"].valuestrsafe(); - + { + BSONElement e = data["id"]; + if ( e.type() == jstOID ) + lastID = e.OID(); + } int len; Message m( (void*)data["msg"].binData( len ) , false ); - massert( 10427 , "invalid writeback message" , m.data->valid() ); + massert( 10427 , "invalid writeback message" , m.header()->valid() ); - grid.getDBConfig( ns )->getChunkManager( ns , true ); + DBConfigPtr db = grid.getDBConfig( ns ); + ShardChunkVersion needVersion( data["version"] ); + + log(1) << "writeback id: " << lastID << " needVersion : " << needVersion.toString() + << " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3) + + if ( logLevel ) log(1) << debugString( m ) << endl; + + if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ){ + // this means when the write went originally, the version was old + // if we're here, it means we've already updated the config, so don't need to do again + //db->getChunkManager( ns , true ); // SERVER-1349 + } + else { + db->getChunkManager( ns , true ); + } Request r( m , 0 ); + r.init(); r.process(); } else { @@ -125,9 +156,13 @@ namespace mongo { conn.done(); secsToSleep = 0; + continue; } catch ( std::exception e ){ log() << "WriteBackListener exception : " << e.what() << endl; + + // It's possible this shard was removed + Shard::reloadShardInfo(); } catch ( ... ){ log() << "WriteBackListener uncaught exception!" << endl; @@ -137,99 +172,154 @@ namespace mongo { if ( secsToSleep > 10 ) secsToSleep = 0; } + + log() << "WriteBackListener exiting : address no longer in cluster " << _addr; + } private: string _addr; - static map<string,WriteBackListener*> _cache; + static map<string,WriteBackListener*> _cache; + static mongo::mutex _cacheLock; + + static set<OID> _seenWritebacks; + static mongo::mutex _seenWritebacksLock; + public: static void init( DBClientBase& conn ){ + scoped_lock lk( _cacheLock ); WriteBackListener*& l = _cache[conn.getServerAddress()]; if ( l ) return; l = new WriteBackListener( conn.getServerAddress() ); l->go(); } + + static void waitFor( const OID& oid ){ + Timer t; + for ( int i=0; i<5000; i++ ){ + { + scoped_lock lk( _seenWritebacksLock ); + if ( _seenWritebacks.count( oid ) ) + return; + } + sleepmillis( 10 ); + } + stringstream ss; + ss << "didn't get writeback for: " << oid << " after: " << t.millis() << " ms"; + uasserted( 13403 , ss.str() ); + } }; - map<string,WriteBackListener*> WriteBackListener::_cache; + void waitForWriteback( const OID& oid ){ + WriteBackListener::waitFor( oid ); + } + map<string,WriteBackListener*> WriteBackListener::_cache; + mongo::mutex WriteBackListener::_cacheLock("WriteBackListener"); + + set<OID> WriteBackListener::_seenWritebacks; + mongo::mutex WriteBackListener::_seenWritebacksLock( "WriteBackListener::seen" ); - void checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative ){ + struct ConnectionShardStatus { + + typedef unsigned long long S; + + ConnectionShardStatus() + : _mutex( "ConnectionShardStatus" ){ + } + + S getSequence( DBClientBase * conn , const string& ns ){ + scoped_lock lk( _mutex ); + return _map[conn][ns]; + } + + void setSequence( DBClientBase * conn , const string& ns , const S& s ){ + scoped_lock lk( _mutex ); + _map[conn][ns] = s; + } + + void reset( DBClientBase * conn ){ + scoped_lock lk( _mutex ); + _map.erase( conn ); + } + + map<DBClientBase*, map<string,unsigned long long> > _map; + mongo::mutex _mutex; + } connectionShardStatus; + + void resetShardVersion( DBClientBase * conn ){ + connectionShardStatus.reset( conn ); + } + + /** + * @return true if had to do something + */ + bool checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative , int tryNumber ){ // TODO: cache, optimize, etc... WriteBackListener::init( conn ); - DBConfig * conf = grid.getDBConfig( ns ); + DBConfigPtr conf = grid.getDBConfig( ns ); if ( ! conf ) - return; + return false; - ShardChunkVersion version = 0; unsigned long long officialSequenceNumber = 0; - - if ( conf->isSharded( ns ) ){ - ChunkManager * manager = conf->getChunkManager( ns , authoritative ); + + ChunkManagerPtr manager; + const bool isSharded = conf->isSharded( ns ); + if ( isSharded ){ + manager = conf->getChunkManager( ns , authoritative ); officialSequenceNumber = manager->getSequenceNumber(); - version = manager->getVersion( conn.getServerAddress() ); } - unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ]; - if ( officialSequenceNumber == sequenceNumber ) - return; - - log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber << endl; + unsigned long long sequenceNumber = connectionShardStatus.getSequence(&conn,ns); + if ( sequenceNumber == officialSequenceNumber ){ + return false; + } + + ShardChunkVersion version = 0; + if ( isSharded ){ + version = manager->getVersion( Shard::make( conn.getServerAddress() ) ); + } + + log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns + << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber + << " version: " << version << " manager: " << manager.get() + << endl; + BSONObj result; if ( setShardVersion( conn , ns , version , authoritative , result ) ){ // success! log(1) << " setShardVersion success!" << endl; - sequenceNumber = officialSequenceNumber; - return; + connectionShardStatus.setSequence( &conn , ns , officialSequenceNumber ); + return true; } - + log(1) << " setShardVersion failed!\n" << result << endl; if ( result.getBoolField( "need_authoritative" ) ) massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative ); if ( ! authoritative ){ - checkShardVersion( conn , ns , 1 ); - return; + checkShardVersion( conn , ns , 1 , tryNumber + 1 ); + return true; } - log(1) << " setShardVersion failed: " << result << endl; - massert( 10429 , "setShardVersion failed!" , 0 ); - } - - bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){ - - BSONObjBuilder cmdBuilder; - cmdBuilder.append( "setShardVersion" , ns.c_str() ); - cmdBuilder.append( "configdb" , configServer.modelServer() ); - cmdBuilder.appendTimestamp( "version" , version ); - cmdBuilder.appendOID( "serverID" , &serverID ); - if ( authoritative ) - cmdBuilder.appendBool( "authoritative" , 1 ); - BSONObj cmd = cmdBuilder.obj(); - - log(1) << " setShardVersion " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl; - - return conn.runCommand( "admin" , cmd , result ); - } - - bool lockNamespaceOnServer( const string& server , const string& ns ){ - ScopedDbConnection conn( server ); - bool res = lockNamespaceOnServer( conn.conn() , ns ); - conn.done(); - return res; - } + if ( tryNumber < 4 ){ + log(1) << "going to retry checkShardVersion" << endl; + sleepmillis( 10 ); + checkShardVersion( conn , ns , 1 , tryNumber + 1 ); + return true; + } - bool lockNamespaceOnServer( DBClientBase& conn , const string& ns ){ - BSONObj lockResult; - return setShardVersion( conn , ns , grid.getNextOpTime() , true , lockResult ); + log() << " setShardVersion failed: " << result << endl; + massert( 10429 , (string)"setShardVersion failed! " + result.jsonString() , 0 ); + return true; } - + } |