diff options
Diffstat (limited to 's/cursors.cpp')
-rw-r--r-- | s/cursors.cpp | 199 |
1 files changed, 185 insertions, 14 deletions
diff --git a/s/cursors.cpp b/s/cursors.cpp index a1c9dfa..6dd7a20 100644 --- a/s/cursors.cpp +++ b/s/cursors.cpp @@ -16,10 +16,12 @@ */ -#include "stdafx.h" +#include "pch.h" #include "cursors.h" #include "../client/connpool.h" #include "../db/queryutil.h" +#include "../db/commands.h" +#include "../util/background.h" namespace mongo { @@ -35,11 +37,13 @@ namespace mongo { _totalSent = 0; _done = false; - do { - // TODO: only create _id when needed - _id = security.getNonce(); - } while ( _id == 0 ); - + _id = 0; + + if ( q.queryOptions & QueryOption_NoCursorTimeout ){ + _lastAccessMillis = 0; + } + else + _lastAccessMillis = Listener::getElapsedTimeMillis(); } ShardedClientCursor::~ShardedClientCursor(){ @@ -48,6 +52,25 @@ namespace mongo { _cursor = 0; } + long long ShardedClientCursor::getId(){ + if ( _id <= 0 ){ + _id = cursorCache.genId(); + assert( _id >= 0 ); + } + return _id; + } + + void ShardedClientCursor::accessed(){ + if ( _lastAccessMillis > 0 ) + _lastAccessMillis = Listener::getElapsedTimeMillis(); + } + + long long ShardedClientCursor::idleTime( long long now ){ + if ( _lastAccessMillis == 0 ) + return 0; + return now - _lastAccessMillis; + } + bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ){ uassert( 10191 , "cursor already done" , ! _done ); @@ -63,7 +86,7 @@ namespace mongo { while ( _cursor->more() ){ BSONObj o = _cursor->next(); - b.append( (void*)o.objdata() , o.objsize() ); + b.appendBuf( (void*)o.objdata() , o.objsize() ); num++; if ( b.len() > maxSize ){ @@ -80,41 +103,189 @@ namespace mongo { sendMore = false; break; } + + if ( ntoreturn == 0 && _totalSent == 0 && num > 100 ){ + // first batch should be max 100 unless batch size specified + break; + } } bool hasMore = sendMore && _cursor->more(); - log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << _id << " totalSent: " << _totalSent << endl; + log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << getId() << " totalSent: " << _totalSent << endl; - replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? _id : 0 ); + replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? getId() : 0 ); _totalSent += num; _done = ! hasMore; return hasMore; } + + // ---- CursorCache ----- + long long CursorCache::TIMEOUT = 600000; - CursorCache::CursorCache(){ + CursorCache::CursorCache() + :_mutex( "CursorCache" ), _shardedTotal(0){ } CursorCache::~CursorCache(){ // TODO: delete old cursors? + int logLevel = 1; + if ( _cursors.size() || _refs.size() ) + logLevel = 0; + log( logLevel ) << " CursorCache at shutdown - " + << " sharded: " << _cursors.size() + << " passthrough: " << _refs.size() + << endl; } - ShardedClientCursor* CursorCache::get( long long id ){ - map<long long,ShardedClientCursor*>::iterator i = _cursors.find( id ); + ShardedClientCursorPtr CursorCache::get( long long id ){ + scoped_lock lk( _mutex ); + MapSharded::iterator i = _cursors.find( id ); if ( i == _cursors.end() ){ OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl; - return 0; + return ShardedClientCursorPtr(); } + i->second->accessed(); return i->second; } - void CursorCache::store( ShardedClientCursor * cursor ){ + void CursorCache::store( ShardedClientCursorPtr cursor ){ + assert( cursor->getId() ); + scoped_lock lk( _mutex ); _cursors[cursor->getId()] = cursor; + _shardedTotal++; } void CursorCache::remove( long long id ){ + assert( id ); + scoped_lock lk( _mutex ); _cursors.erase( id ); } + void CursorCache::storeRef( const string& server , long long id ){ + assert( id ); + scoped_lock lk( _mutex ); + _refs[id] = server; + } + + long long CursorCache::genId(){ + while ( true ){ + long long x = security.getNonce(); + if ( x == 0 ) + continue; + if ( x < 0 ) + x *= -1; + + scoped_lock lk( _mutex ); + MapSharded::iterator i = _cursors.find( x ); + if ( i != _cursors.end() ) + continue; + + MapNormal::iterator j = _refs.find( x ); + if ( j != _refs.end() ) + continue; + + return x; + } + } + + void CursorCache::gotKillCursors(Message& m ){ + int *x = (int *) m.singleData()->_data; + x++; // reserved + int n = *x++; + + if ( n > 2000 ){ + log( n < 30000 ? LL_WARNING : LL_ERROR ) << "receivedKillCursors, n=" << n << endl; + } + + + uassert( 13286 , "sent 0 cursors to kill" , n >= 1 ); + uassert( 13287 , "too many cursors to kill" , n < 30000 ); + + long long * cursors = (long long *)x; + for ( int i=0; i<n; i++ ){ + long long id = cursors[i]; + if ( ! id ){ + log( LL_WARNING ) << " got cursor id of 0 to kill" << endl; + continue; + } + + string server; + { + scoped_lock lk( _mutex ); + + MapSharded::iterator i = _cursors.find( id ); + if ( i != _cursors.end() ){ + _cursors.erase( i ); + continue; + } + + MapNormal::iterator j = _refs.find( id ); + if ( j == _refs.end() ){ + log( LL_WARNING ) << "can't find cursor: " << id << endl; + continue; + } + server = j->second; + _refs.erase( j ); + } + + assert( server.size() ); + ScopedDbConnection conn( server ); + conn->killCursor( id ); + conn.done(); + } + } + + void CursorCache::appendInfo( BSONObjBuilder& result ){ + scoped_lock lk( _mutex ); + result.append( "sharded" , (int)_cursors.size() ); + result.appendNumber( "shardedEver" , _shardedTotal ); + result.append( "refs" , (int)_refs.size() ); + result.append( "totalOpen" , (int)(_cursors.size() + _refs.size() ) ); + } + + void CursorCache::doTimeouts(){ + long long now = Listener::getElapsedTimeMillis(); + scoped_lock lk( _mutex ); + for ( MapSharded::iterator i=_cursors.begin(); i!=_cursors.end(); ++i ){ + long long idleFor = i->second->idleTime( now ); + if ( idleFor < TIMEOUT ){ + continue; + } + log() << "killing old cursor " << i->second->getId() << " idle for: " << idleFor << "ms" << endl; // TODO: make log(1) + _cursors.erase( i ); + } + } + CursorCache cursorCache; + + class CursorTimeoutThread : public PeriodicBackgroundJob { + public: + CursorTimeoutThread() : PeriodicBackgroundJob( 4000 ){} + virtual string name() { return "cursorTimeout"; } + virtual void runLoop(){ + cursorCache.doTimeouts(); + } + } cursorTimeoutThread; + + void CursorCache::startTimeoutThread(){ + cursorTimeoutThread.go(); + } + + class CmdCursorInfo : public Command { + public: + CmdCursorInfo() : Command( "cursorInfo", true ) {} + virtual bool slaveOk() const { return true; } + virtual void help( stringstream& help ) const { + help << " example: { cursorInfo : 1 }"; + } + virtual LockType locktype() const { return NONE; } + bool run(const string&, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){ + cursorCache.appendInfo( result ); + if ( jsobj["setTimeout"].isNumber() ) + CursorCache::TIMEOUT = jsobj["setTimeout"].numberLong(); + return true; + } + } cmdCursorInfo; + } |