summaryrefslogtreecommitdiff
path: root/client/syncclusterconnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'client/syncclusterconnection.cpp')
-rw-r--r--client/syncclusterconnection.cpp179
1 files changed, 148 insertions, 31 deletions
diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp
index 0a8fc79..5324b6c 100644
--- a/client/syncclusterconnection.cpp
+++ b/client/syncclusterconnection.cpp
@@ -16,15 +16,29 @@
*/
-#include "stdafx.h"
+#include "pch.h"
#include "syncclusterconnection.h"
#include "../db/dbmessage.h"
// error codes 8000-8009
namespace mongo {
+
+ SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L) : _mutex("SynClusterConnection") {
+ {
+ stringstream s;
+ int n=0;
+ for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) {
+ if( ++n > 1 ) s << ',';
+ s << i->toString();
+ }
+ _address = s.str();
+ }
+ for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ )
+ _connect( i->toString() );
+ }
- SyncClusterConnection::SyncClusterConnection( string commaSeperated ){
+ SyncClusterConnection::SyncClusterConnection( string commaSeperated ) : _mutex("SyncClusterConnection") {
_address = commaSeperated;
string::size_type idx;
while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){
@@ -36,7 +50,7 @@ namespace mongo {
uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 );
}
- SyncClusterConnection::SyncClusterConnection( string a , string b , string c ){
+ SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") {
_address = a + "," + b + "," + c;
// connect to all even if not working
_connect( a );
@@ -44,7 +58,7 @@ namespace mongo {
_connect( c );
}
- SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ){
+ SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ) : _mutex("SyncClusterConnection") {
assert(0);
}
@@ -55,6 +69,7 @@ namespace mongo {
}
bool SyncClusterConnection::prepare( string& errmsg ){
+ _lastErrors.clear();
return fsync( errmsg );
}
@@ -79,7 +94,7 @@ namespace mongo {
}
void SyncClusterConnection::_checkLast(){
- vector<BSONObj> all;
+ _lastErrors.clear();
vector<string> errors;
for ( size_t i=0; i<_conns.size(); i++ ){
@@ -95,17 +110,17 @@ namespace mongo {
catch ( ... ){
err += "unknown failure";
}
- all.push_back( res );
+ _lastErrors.push_back( res.getOwned() );
errors.push_back( err );
}
-
- assert( all.size() == errors.size() && all.size() == _conns.size() );
+
+ assert( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() );
stringstream err;
bool ok = true;
for ( size_t i = 0; i<_conns.size(); i++ ){
- BSONObj res = all[i];
+ BSONObj res = _lastErrors[i];
if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 )
continue;
ok = false;
@@ -117,35 +132,71 @@ namespace mongo {
throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() );
}
+ BSONObj SyncClusterConnection::getLastErrorDetailed(){
+ if ( _lastErrors.size() )
+ return _lastErrors[0];
+ return DBClientBase::getLastErrorDetailed();
+ }
+
void SyncClusterConnection::_connect( string host ){
log() << "SyncClusterConnection connecting to [" << host << "]" << endl;
DBClientConnection * c = new DBClientConnection( true );
string errmsg;
if ( ! c->connect( host , errmsg ) )
log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl;
+ _connAddresses.push_back( host );
_conns.push_back( c );
}
- auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip,
- const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){
+ bool SyncClusterConnection::callRead( Message& toSend , Message& response ){
+ // TODO: need to save state of which one to go back to somehow...
+ return _conns[0]->callRead( toSend , response );
+ }
+ BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
+
if ( ns.find( ".$cmd" ) != string::npos ){
string cmdName = query.obj.firstElement().fieldName();
- int lockType = 0;
-
- map<string,int>::iterator i = _lockTypes.find( cmdName );
- if ( i == _lockTypes.end() ){
- BSONObj info;
- uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( cmdName << "1" << "help" << 1 ) , info ) );
- lockType = info["lockType"].numberInt();
- _lockTypes[cmdName] = lockType;
- }
- else {
- lockType = i->second;
+ int lockType = _lockType( cmdName );
+
+ if ( lockType > 0 ){ // write $cmd
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 13104 , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg );
+
+ vector<BSONObj> all;
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() );
+ }
+
+ _checkLast();
+
+ for ( size_t i=0; i<all.size(); i++ ){
+ BSONObj temp = all[i];
+ if ( isOk( temp ) )
+ continue;
+ stringstream ss;
+ ss << "write $cmd failed on a shard: " << temp.jsonString();
+ ss << " " << _conns[i]->toString();
+ throw UserException( 13105 , ss.str() );
+ }
+
+ return all[0];
}
-
- uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection: " + cmdName , lockType <= 0 );
+ }
+
+ return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions );
+ }
+
+
+ auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip,
+ const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){
+ _lastErrors.clear();
+ if ( ns.find( ".$cmd" ) != string::npos ){
+ string cmdName = query.obj.firstElement().fieldName();
+ int lockType = _lockType( cmdName );
+ uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 );
}
return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize );
@@ -185,6 +236,10 @@ namespace mongo {
}
void SyncClusterConnection::insert( const string &ns, BSONObj obj ){
+
+ uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() ,
+ ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() );
+
string errmsg;
if ( ! prepare( errmsg ) )
throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg );
@@ -201,19 +256,52 @@ namespace mongo {
}
void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){
- assert(0);
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg );
+
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ _conns[i]->remove( ns , query , justOne );
+ }
+
+ _checkLast();
}
void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){
- string errmsg;
- if ( ! prepare( errmsg ) )
- throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg );
+
+ if ( upsert ){
+ uassert( 13120 , "SyncClusterConnection::update upsert query needs _id" , query.obj["_id"].type() );
+ }
+
+ if ( _writeConcern ){
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg );
+ }
for ( size_t i=0; i<_conns.size(); i++ ){
- _conns[i]->update( ns , query , obj , upsert , multi );
+ try {
+ _conns[i]->update( ns , query , obj , upsert , multi );
+ }
+ catch ( std::exception& e ){
+ if ( _writeConcern )
+ throw e;
+ }
}
- _checkLast();
+ if ( _writeConcern ){
+ _checkLast();
+ assert( _lastErrors.size() > 1 );
+
+ int a = _lastErrors[0]["n"].numberInt();
+ for ( unsigned i=1; i<_lastErrors.size(); i++ ){
+ int b = _lastErrors[i]["n"].numberInt();
+ if ( a == b )
+ continue;
+
+ throw UpdateNotTheSame( 8017 , "update not consistent" , _connAddresses , _lastErrors );
+ }
+ }
}
string SyncClusterConnection::_toString() const {
@@ -244,12 +332,41 @@ namespace mongo {
}
void SyncClusterConnection::say( Message &toSend ){
- assert(0);
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg );
+
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ _conns[i]->say( toSend );
+ }
+
+ _checkLast();
}
void SyncClusterConnection::sayPiggyBack( Message &toSend ){
assert(0);
}
+ int SyncClusterConnection::_lockType( const string& name ){
+ {
+ scoped_lock lk(_mutex);
+ map<string,int>::iterator i = _lockTypes.find( name );
+ if ( i != _lockTypes.end() )
+ return i->second;
+ }
+
+ BSONObj info;
+ uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) );
+
+ int lockType = info["lockType"].numberInt();
+ scoped_lock lk(_mutex);
+ _lockTypes[name] = lockType;
+ return lockType;
+ }
+
+ void SyncClusterConnection::killCursor( long long cursorID ){
+ // should never need to do this
+ assert(0);
+ }
}