diff options
Diffstat (limited to 'client/syncclusterconnection.cpp')
-rw-r--r-- | client/syncclusterconnection.cpp | 179 |
1 files changed, 148 insertions, 31 deletions
diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp index 0a8fc79..5324b6c 100644 --- a/client/syncclusterconnection.cpp +++ b/client/syncclusterconnection.cpp @@ -16,15 +16,29 @@ */ -#include "stdafx.h" +#include "pch.h" #include "syncclusterconnection.h" #include "../db/dbmessage.h" // error codes 8000-8009 namespace mongo { + + SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L) : _mutex("SynClusterConnection") { + { + stringstream s; + int n=0; + for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) { + if( ++n > 1 ) s << ','; + s << i->toString(); + } + _address = s.str(); + } + for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) + _connect( i->toString() ); + } - SyncClusterConnection::SyncClusterConnection( string commaSeperated ){ + SyncClusterConnection::SyncClusterConnection( string commaSeperated ) : _mutex("SyncClusterConnection") { _address = commaSeperated; string::size_type idx; while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){ @@ -36,7 +50,7 @@ namespace mongo { uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 ); } - SyncClusterConnection::SyncClusterConnection( string a , string b , string c ){ + SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") { _address = a + "," + b + "," + c; // connect to all even if not working _connect( a ); @@ -44,7 +58,7 @@ namespace mongo { _connect( c ); } - SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ){ + SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ) : _mutex("SyncClusterConnection") { assert(0); } @@ -55,6 +69,7 @@ namespace mongo { } bool SyncClusterConnection::prepare( string& errmsg ){ + _lastErrors.clear(); return fsync( errmsg ); } @@ -79,7 +94,7 @@ namespace mongo { } void SyncClusterConnection::_checkLast(){ - vector<BSONObj> all; + _lastErrors.clear(); vector<string> errors; for ( size_t i=0; i<_conns.size(); i++ ){ @@ -95,17 +110,17 @@ namespace mongo { catch ( ... ){ err += "unknown failure"; } - all.push_back( res ); + _lastErrors.push_back( res.getOwned() ); errors.push_back( err ); } - - assert( all.size() == errors.size() && all.size() == _conns.size() ); + + assert( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() ); stringstream err; bool ok = true; for ( size_t i = 0; i<_conns.size(); i++ ){ - BSONObj res = all[i]; + BSONObj res = _lastErrors[i]; if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 ) continue; ok = false; @@ -117,35 +132,71 @@ namespace mongo { throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() ); } + BSONObj SyncClusterConnection::getLastErrorDetailed(){ + if ( _lastErrors.size() ) + return _lastErrors[0]; + return DBClientBase::getLastErrorDetailed(); + } + void SyncClusterConnection::_connect( string host ){ log() << "SyncClusterConnection connecting to [" << host << "]" << endl; DBClientConnection * c = new DBClientConnection( true ); string errmsg; if ( ! c->connect( host , errmsg ) ) log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl; + _connAddresses.push_back( host ); _conns.push_back( c ); } - auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){ + bool SyncClusterConnection::callRead( Message& toSend , Message& response ){ + // TODO: need to save state of which one to go back to somehow... + return _conns[0]->callRead( toSend , response ); + } + BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { + if ( ns.find( ".$cmd" ) != string::npos ){ string cmdName = query.obj.firstElement().fieldName(); - int lockType = 0; - - map<string,int>::iterator i = _lockTypes.find( cmdName ); - if ( i == _lockTypes.end() ){ - BSONObj info; - uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( cmdName << "1" << "help" << 1 ) , info ) ); - lockType = info["lockType"].numberInt(); - _lockTypes[cmdName] = lockType; - } - else { - lockType = i->second; + int lockType = _lockType( cmdName ); + + if ( lockType > 0 ){ // write $cmd + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 13104 , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg ); + + vector<BSONObj> all; + for ( size_t i=0; i<_conns.size(); i++ ){ + all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() ); + } + + _checkLast(); + + for ( size_t i=0; i<all.size(); i++ ){ + BSONObj temp = all[i]; + if ( isOk( temp ) ) + continue; + stringstream ss; + ss << "write $cmd failed on a shard: " << temp.jsonString(); + ss << " " << _conns[i]->toString(); + throw UserException( 13105 , ss.str() ); + } + + return all[0]; } - - uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection: " + cmdName , lockType <= 0 ); + } + + return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions ); + } + + + auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, + const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){ + _lastErrors.clear(); + if ( ns.find( ".$cmd" ) != string::npos ){ + string cmdName = query.obj.firstElement().fieldName(); + int lockType = _lockType( cmdName ); + uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 ); } return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); @@ -185,6 +236,10 @@ namespace mongo { } void SyncClusterConnection::insert( const string &ns, BSONObj obj ){ + + uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() , + ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() ); + string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg ); @@ -201,19 +256,52 @@ namespace mongo { } void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ - assert(0); + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg ); + + for ( size_t i=0; i<_conns.size(); i++ ){ + _conns[i]->remove( ns , query , justOne ); + } + + _checkLast(); } void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ - string errmsg; - if ( ! prepare( errmsg ) ) - throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); + + if ( upsert ){ + uassert( 13120 , "SyncClusterConnection::update upsert query needs _id" , query.obj["_id"].type() ); + } + + if ( _writeConcern ){ + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); + } for ( size_t i=0; i<_conns.size(); i++ ){ - _conns[i]->update( ns , query , obj , upsert , multi ); + try { + _conns[i]->update( ns , query , obj , upsert , multi ); + } + catch ( std::exception& e ){ + if ( _writeConcern ) + throw e; + } } - _checkLast(); + if ( _writeConcern ){ + _checkLast(); + assert( _lastErrors.size() > 1 ); + + int a = _lastErrors[0]["n"].numberInt(); + for ( unsigned i=1; i<_lastErrors.size(); i++ ){ + int b = _lastErrors[i]["n"].numberInt(); + if ( a == b ) + continue; + + throw UpdateNotTheSame( 8017 , "update not consistent" , _connAddresses , _lastErrors ); + } + } } string SyncClusterConnection::_toString() const { @@ -244,12 +332,41 @@ namespace mongo { } void SyncClusterConnection::say( Message &toSend ){ - assert(0); + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg ); + + for ( size_t i=0; i<_conns.size(); i++ ){ + _conns[i]->say( toSend ); + } + + _checkLast(); } void SyncClusterConnection::sayPiggyBack( Message &toSend ){ assert(0); } + int SyncClusterConnection::_lockType( const string& name ){ + { + scoped_lock lk(_mutex); + map<string,int>::iterator i = _lockTypes.find( name ); + if ( i != _lockTypes.end() ) + return i->second; + } + + BSONObj info; + uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) ); + + int lockType = info["lockType"].numberInt(); + scoped_lock lk(_mutex); + _lockTypes[name] = lockType; + return lockType; + } + + void SyncClusterConnection::killCursor( long long cursorID ){ + // should never need to do this + assert(0); + } } |