diff options
Diffstat (limited to 's')
| -rw-r--r-- | s/config.cpp | 97 | ||||
| -rw-r--r-- | s/config_migrate.cpp | 13 | ||||
| -rw-r--r-- | s/cursors.cpp | 16 | ||||
| -rw-r--r-- | s/d_migrate.cpp | 12 | ||||
| -rw-r--r-- | s/security.cpp | 4 | ||||
| -rw-r--r-- | s/server.cpp | 2 | ||||
| -rw-r--r-- | s/strategy_shard.cpp | 4 |
7 files changed, 95 insertions, 53 deletions
diff --git a/s/config.cpp b/s/config.cpp index 645e923..0f5aaf1 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -150,7 +150,30 @@ namespace mongo { _save(); } - return getChunkManager(ns,true,true); + ChunkManagerPtr manager = getChunkManager(ns,true,true); + + // Tell the primary mongod to refresh it's data + // TODO: Think the real fix here is for mongos to just assume all collections sharded, when we get there + for( int i = 0; i < 4; i++ ){ + if( i == 3 ){ + warning() << "too many tries updating initial version of " << ns << " on shard primary " << getPrimary() << + ", other mongoses may not see the collection as sharded immediately" << endl; + break; + } + try { + ShardConnection conn( getPrimary(), ns ); + conn.setVersion(); + conn.done(); + break; + } + catch( DBException& e ){ + warning() << "could not update initial version of " << ns << " on shard primary " << getPrimary() << + causedBy( e ) << endl; + } + sleepsecs( i ); + } + + return manager; } bool DBConfig::removeSharding( const string& ns ) { @@ -192,14 +215,13 @@ namespace mongo { { scoped_lock lk( _lock ); - CollectionInfo& ci = _collections[ns]; - - bool earlyReload = ! ci.isSharded() && ( shouldReload || forceReload ); + bool earlyReload = ! _collections[ns].isSharded() && ( shouldReload || forceReload ); if ( earlyReload ) { // this is to catch cases where there this is a new sharded collection _reload(); - ci = _collections[ns]; } + + CollectionInfo& ci = _collections[ns]; massert( 10181 , (string)"not sharded:" + ns , ci.isSharded() ); assert( ! ci.key().isEmpty() ); @@ -710,42 +732,53 @@ namespace mongo { set<string> got; ScopedDbConnection conn( _primary, 30.0 ); - auto_ptr<DBClientCursor> c = conn->query( ShardNS::settings , BSONObj() ); - assert( c.get() ); - while ( c->more() ) { - BSONObj o = c->next(); - string name = o["_id"].valuestrsafe(); - got.insert( name ); - if ( name == "chunksize" ) { - LOG(1) << "MaxChunkSize: " << o["value"] << endl; - Chunk::MaxChunkSize = o["value"].numberInt() * 1024 * 1024; - } - else if ( name == "balancer" ) { - // ones we ignore here - } - else { - log() << "warning: unknown setting [" << name << "]" << endl; - } - } - if ( ! got.count( "chunksize" ) ) { - conn->insert( ShardNS::settings , BSON( "_id" << "chunksize" << - "value" << (Chunk::MaxChunkSize / ( 1024 * 1024 ) ) ) ); - } + try { + auto_ptr<DBClientCursor> c = conn->query( ShardNS::settings , BSONObj() ); + assert( c.get() ); + while ( c->more() ) { + + BSONObj o = c->next(); + string name = o["_id"].valuestrsafe(); + got.insert( name ); + if ( name == "chunksize" ) { + int csize = o["value"].numberInt(); + + // validate chunksize before proceeding + if ( csize == 0 ) { + // setting was not modified; mark as such + got.erase(name); + log() << "warning: invalid chunksize (" << csize << ") ignored" << endl; + } else { + LOG(1) << "MaxChunkSize: " << csize << endl; + Chunk::MaxChunkSize = csize * 1024 * 1024; + } + } + else if ( name == "balancer" ) { + // ones we ignore here + } + else { + log() << "warning: unknown setting [" << name << "]" << endl; + } + } - // indexes - try { + if ( ! got.count( "chunksize" ) ) { + conn->insert( ShardNS::settings , BSON( "_id" << "chunksize" << + "value" << (Chunk::MaxChunkSize / ( 1024 * 1024 ) ) ) ); + } + + // indexes conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "min" << 1 ) , true ); conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "shard" << 1 << "min" << 1 ) , true ); conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "lastmod" << 1 ) , true ); conn->ensureIndex( ShardNS::shard , BSON( "host" << 1 ) , true ); + + conn.done(); } - catch ( std::exception& e ) { - log( LL_WARNING ) << "couldn't create indexes on config db: " << e.what() << endl; + catch ( DBException& e ) { + warning() << "couldn't load settings or create indexes on config db: " << e.what() << endl; } - - conn.done(); } string ConfigServer::getHost( string name , bool withPort ) { diff --git a/s/config_migrate.cpp b/s/config_migrate.cpp index fff023c..7b0c5a6 100644 --- a/s/config_migrate.cpp +++ b/s/config_migrate.cpp @@ -37,7 +37,18 @@ namespace mongo { if ( cur == 0 ) { ScopedDbConnection conn( _primary ); - conn->insert( "config.version" , BSON( "_id" << 1 << "version" << VERSION ) ); + + // If the cluster has not previously been initialized, we need to set the version before using so + // subsequent mongoses use the config data the same way. This requires all three config servers online + // initially. + try { + conn->insert( "config.version" , BSON( "_id" << 1 << "version" << VERSION ) ); + } + catch( DBException& e ){ + error() << "All config servers must initially be reachable for the cluster to be initialized." << endl; + throw; + } + pool.flush(); assert( VERSION == dbConfigVersion( conn.conn() ) ); conn.done(); diff --git a/s/cursors.cpp b/s/cursors.cpp index 12b3d5e..5957ffc 100644 --- a/s/cursors.cpp +++ b/s/cursors.cpp @@ -82,7 +82,10 @@ namespace mongo { BufBuilder b(32768); int num = 0; - bool sendMore = true; + + // Send more if ntoreturn is 0, or any value > 1 (one is assumed to be a single doc return, with no cursor) + bool sendMore = ntoreturn == 0 || ntoreturn > 1; + ntoreturn = abs( ntoreturn ); while ( _cursor->more() ) { BSONObj o = _cursor->next(); @@ -99,20 +102,15 @@ namespace mongo { break; } - if ( ntoreturn != 0 && ( -1 * num + _totalSent ) == ntoreturn ) { - // hard limit - total to send - sendMore = false; - break; - } - - if ( ntoreturn == 0 && _totalSent == 0 && num > 100 ) { + if ( ntoreturn == 0 && _totalSent == 0 && num >= 100 ) { // first batch should be max 100 unless batch size specified break; } } bool hasMore = sendMore && _cursor->more(); - LOG(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << getId() << " totalSent: " << _totalSent << endl; + LOG(5) << "\t hasMore: " << hasMore << " sendMore: " << sendMore << " cursorMore: " << _cursor->more() << " ntoreturn: " << ntoreturn + << " num: " << num << " wouldSendMoreIfHad: " << sendMore << " id:" << getId() << " totalSent: " << _totalSent << endl; replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? getId() : 0 ); _totalSent += num; diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp index e24a02d..731761f 100644 --- a/s/d_migrate.cpp +++ b/s/d_migrate.cpp @@ -1445,13 +1445,13 @@ namespace mongo { bool didAnything = false; if ( xfer["deleted"].isABSONObj() ) { - writelock lk(ns); - Client::Context cx(ns); - RemoveSaver rs( "moveChunk" , ns , "removedDuring" ); BSONObjIterator i( xfer["deleted"].Obj() ); while ( i.more() ) { + writelock lk(ns); + Client::Context cx(ns); + BSONObj id = i.next().Obj(); // do not apply deletes if they do not belong to the chunk being migrated @@ -1472,11 +1472,11 @@ namespace mongo { } if ( xfer["reload"].isABSONObj() ) { - writelock lk(ns); - Client::Context cx(ns); - BSONObjIterator i( xfer["reload"].Obj() ); while ( i.more() ) { + writelock lk(ns); + Client::Context cx(ns); + BSONObj it = i.next().Obj(); Helpers::upsert( ns , it ); diff --git a/s/security.cpp b/s/security.cpp index 0b8954e..68be68a 100644 --- a/s/security.cpp +++ b/s/security.cpp @@ -42,13 +42,13 @@ namespace mongo { static BSONObj userPattern = BSON("user" << 1); - ShardConnection conn( s, systemUsers ); + ScopedDbConnection conn( s, 30.0 ); OCCASIONALLY conn->ensureIndex(systemUsers, userPattern, false, "user_1"); { BSONObjBuilder b; b << "user" << user; BSONObj query = b.done(); - userObj = conn->findOne(systemUsers, query); + userObj = conn->findOne(systemUsers, query, 0, QueryOption_SlaveOk); if( userObj.isEmpty() ) { log() << "auth: couldn't find user " << user << ", " << systemUsers << endl; conn.done(); // return to pool diff --git a/s/server.cpp b/s/server.cpp index a6ffab9..bf8c215 100644 --- a/s/server.cpp +++ b/s/server.cpp @@ -290,7 +290,7 @@ int _main(int argc, char* argv[]) { shardConnectionPool.addHook( new ShardingConnectionHook( true ) ); shardConnectionPool.setName( "mongos shardconnection connectionpool" ); - + // Mongos shouldn't lazily kill cursors, otherwise we can end up with extras from migration DBClientConnection::setLazyKillCursor( false ); ReplicaSetMonitor::setConfigChangeHook( boost::bind( &ConfigServer::replicaSetChange , &configServer , _1 ) ); diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index c6b30e7..c96a7e1 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -91,10 +91,10 @@ namespace mongo { } ShardedClientCursorPtr cc (new ShardedClientCursor( q , cursor )); - if ( ! cc->sendNextBatch( r ) ) { + if ( ! cc->sendNextBatch( r, q.ntoreturn ) ) { return; } - LOG(6) << "storing cursor : " << cc->getId() << endl; + LOG(5) << "storing cursor : " << cc->getId() << endl; cursorCache.store( cc ); } |
