summaryrefslogtreecommitdiff
path: root/s/cursors.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/cursors.cpp')
-rw-r--r--s/cursors.cpp153
1 files changed, 86 insertions, 67 deletions
diff --git a/s/cursors.cpp b/s/cursors.cpp
index 6dd7a20..cf2735b 100644
--- a/s/cursors.cpp
+++ b/s/cursors.cpp
@@ -21,90 +21,90 @@
#include "../client/connpool.h"
#include "../db/queryutil.h"
#include "../db/commands.h"
-#include "../util/background.h"
+#include "../util/concurrency/task.h"
namespace mongo {
-
+
// -------- ShardedCursor -----------
- ShardedClientCursor::ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ){
+ ShardedClientCursor::ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ) {
assert( cursor );
_cursor = cursor;
-
+
_skip = q.ntoskip;
_ntoreturn = q.ntoreturn;
-
+
_totalSent = 0;
_done = false;
_id = 0;
-
- if ( q.queryOptions & QueryOption_NoCursorTimeout ){
+
+ if ( q.queryOptions & QueryOption_NoCursorTimeout ) {
_lastAccessMillis = 0;
}
- else
+ else
_lastAccessMillis = Listener::getElapsedTimeMillis();
}
- ShardedClientCursor::~ShardedClientCursor(){
+ ShardedClientCursor::~ShardedClientCursor() {
assert( _cursor );
delete _cursor;
_cursor = 0;
}
- long long ShardedClientCursor::getId(){
- if ( _id <= 0 ){
+ long long ShardedClientCursor::getId() {
+ if ( _id <= 0 ) {
_id = cursorCache.genId();
assert( _id >= 0 );
}
return _id;
}
- void ShardedClientCursor::accessed(){
+ void ShardedClientCursor::accessed() {
if ( _lastAccessMillis > 0 )
_lastAccessMillis = Listener::getElapsedTimeMillis();
}
- long long ShardedClientCursor::idleTime( long long now ){
+ long long ShardedClientCursor::idleTime( long long now ) {
if ( _lastAccessMillis == 0 )
return 0;
return now - _lastAccessMillis;
}
- bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ){
+ bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ) {
uassert( 10191 , "cursor already done" , ! _done );
-
+
int maxSize = 1024 * 1024;
if ( _totalSent > 0 )
maxSize *= 3;
-
+
BufBuilder b(32768);
-
+
int num = 0;
bool sendMore = true;
- while ( _cursor->more() ){
+ while ( _cursor->more() ) {
BSONObj o = _cursor->next();
b.appendBuf( (void*)o.objdata() , o.objsize() );
num++;
-
- if ( b.len() > maxSize ){
+
+ if ( b.len() > maxSize ) {
break;
}
- if ( num == ntoreturn ){
+ if ( num == ntoreturn ) {
// soft limit aka batch size
break;
}
- if ( ntoreturn != 0 && ( -1 * num + _totalSent ) == ntoreturn ){
+ if ( ntoreturn != 0 && ( -1 * num + _totalSent ) == ntoreturn ) {
// hard limit - total to send
sendMore = false;
break;
}
- if ( ntoreturn == 0 && _totalSent == 0 && num > 100 ){
+ if ( ntoreturn == 0 && _totalSent == 0 && num > 100 ) {
// first batch should be max 100 unless batch size specified
break;
}
@@ -112,123 +112,141 @@ namespace mongo {
bool hasMore = sendMore && _cursor->more();
log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << getId() << " totalSent: " << _totalSent << endl;
-
+
replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? getId() : 0 );
_totalSent += num;
_done = ! hasMore;
-
+
return hasMore;
}
// ---- CursorCache -----
-
+
long long CursorCache::TIMEOUT = 600000;
CursorCache::CursorCache()
- :_mutex( "CursorCache" ), _shardedTotal(0){
+ :_mutex( "CursorCache" ), _shardedTotal(0) {
}
- CursorCache::~CursorCache(){
+ CursorCache::~CursorCache() {
// TODO: delete old cursors?
int logLevel = 1;
if ( _cursors.size() || _refs.size() )
logLevel = 0;
log( logLevel ) << " CursorCache at shutdown - "
- << " sharded: " << _cursors.size()
+ << " sharded: " << _cursors.size()
<< " passthrough: " << _refs.size()
<< endl;
}
- ShardedClientCursorPtr CursorCache::get( long long id ){
+ ShardedClientCursorPtr CursorCache::get( long long id ) const {
+ LOG(_myLogLevel) << "CursorCache::get id: " << id << endl;
scoped_lock lk( _mutex );
- MapSharded::iterator i = _cursors.find( id );
- if ( i == _cursors.end() ){
+ MapSharded::const_iterator i = _cursors.find( id );
+ if ( i == _cursors.end() ) {
OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl;
return ShardedClientCursorPtr();
}
i->second->accessed();
return i->second;
}
-
- void CursorCache::store( ShardedClientCursorPtr cursor ){
+
+ void CursorCache::store( ShardedClientCursorPtr cursor ) {
+ LOG(_myLogLevel) << "CursorCache::store cursor " << " id: " << cursor->getId() << endl;
assert( cursor->getId() );
scoped_lock lk( _mutex );
_cursors[cursor->getId()] = cursor;
_shardedTotal++;
}
- void CursorCache::remove( long long id ){
+ void CursorCache::remove( long long id ) {
assert( id );
scoped_lock lk( _mutex );
_cursors.erase( id );
}
-
- void CursorCache::storeRef( const string& server , long long id ){
+
+ void CursorCache::storeRef( const string& server , long long id ) {
+ LOG(_myLogLevel) << "CursorCache::storeRef server: " << server << " id: " << id << endl;
assert( id );
scoped_lock lk( _mutex );
_refs[id] = server;
}
-
- long long CursorCache::genId(){
- while ( true ){
+
+ string CursorCache::getRef( long long id ) const {
+ LOG(_myLogLevel) << "CursorCache::getRef id: " << id << endl;
+ assert( id );
+ scoped_lock lk( _mutex );
+ MapNormal::const_iterator i = _refs.find( id );
+ if ( i == _refs.end() )
+ return "";
+ return i->second;
+ }
+
+
+ long long CursorCache::genId() {
+ while ( true ) {
long long x = security.getNonce();
if ( x == 0 )
continue;
if ( x < 0 )
x *= -1;
-
+
scoped_lock lk( _mutex );
MapSharded::iterator i = _cursors.find( x );
if ( i != _cursors.end() )
continue;
-
+
MapNormal::iterator j = _refs.find( x );
if ( j != _refs.end() )
continue;
-
+
return x;
}
}
- void CursorCache::gotKillCursors(Message& m ){
+ void CursorCache::gotKillCursors(Message& m ) {
int *x = (int *) m.singleData()->_data;
x++; // reserved
int n = *x++;
- if ( n > 2000 ){
+ if ( n > 2000 ) {
log( n < 30000 ? LL_WARNING : LL_ERROR ) << "receivedKillCursors, n=" << n << endl;
}
uassert( 13286 , "sent 0 cursors to kill" , n >= 1 );
uassert( 13287 , "too many cursors to kill" , n < 30000 );
-
+
long long * cursors = (long long *)x;
- for ( int i=0; i<n; i++ ){
+ for ( int i=0; i<n; i++ ) {
long long id = cursors[i];
- if ( ! id ){
+ LOG(_myLogLevel) << "CursorCache::gotKillCursors id: " << id << endl;
+
+ if ( ! id ) {
log( LL_WARNING ) << " got cursor id of 0 to kill" << endl;
continue;
}
-
- string server;
+
+ string server;
{
scoped_lock lk( _mutex );
MapSharded::iterator i = _cursors.find( id );
- if ( i != _cursors.end() ){
+ if ( i != _cursors.end() ) {
_cursors.erase( i );
continue;
}
-
+
MapNormal::iterator j = _refs.find( id );
- if ( j == _refs.end() ){
+ if ( j == _refs.end() ) {
log( LL_WARNING ) << "can't find cursor: " << id << endl;
continue;
}
server = j->second;
_refs.erase( j );
}
-
+
+ LOG(_myLogLevel) << "CursorCache::found gotKillCursors id: " << id << " server: " << server << endl;
+
assert( server.size() );
ScopedDbConnection conn( server );
conn->killCursor( id );
@@ -236,7 +254,7 @@ namespace mongo {
}
}
- void CursorCache::appendInfo( BSONObjBuilder& result ){
+ void CursorCache::appendInfo( BSONObjBuilder& result ) const {
scoped_lock lk( _mutex );
result.append( "sharded" , (int)_cursors.size() );
result.appendNumber( "shardedEver" , _shardedTotal );
@@ -244,12 +262,12 @@ namespace mongo {
result.append( "totalOpen" , (int)(_cursors.size() + _refs.size() ) );
}
- void CursorCache::doTimeouts(){
+ void CursorCache::doTimeouts() {
long long now = Listener::getElapsedTimeMillis();
scoped_lock lk( _mutex );
- for ( MapSharded::iterator i=_cursors.begin(); i!=_cursors.end(); ++i ){
+ for ( MapSharded::iterator i=_cursors.begin(); i!=_cursors.end(); ++i ) {
long long idleFor = i->second->idleTime( now );
- if ( idleFor < TIMEOUT ){
+ if ( idleFor < TIMEOUT ) {
continue;
}
log() << "killing old cursor " << i->second->getId() << " idle for: " << idleFor << "ms" << endl; // TODO: make log(1)
@@ -258,18 +276,19 @@ namespace mongo {
}
CursorCache cursorCache;
-
- class CursorTimeoutThread : public PeriodicBackgroundJob {
+
+ int CursorCache::_myLogLevel = 3;
+
+ class CursorTimeoutTask : public task::Task {
public:
- CursorTimeoutThread() : PeriodicBackgroundJob( 4000 ){}
- virtual string name() { return "cursorTimeout"; }
- virtual void runLoop(){
+ virtual string name() const { return "cursorTimeout"; }
+ virtual void doWork() {
cursorCache.doTimeouts();
}
- } cursorTimeoutThread;
+ } cursorTimeoutTask;
- void CursorCache::startTimeoutThread(){
- cursorTimeoutThread.go();
+ void CursorCache::startTimeoutThread() {
+ task::repeat( &cursorTimeoutTask , 400 );
}
class CmdCursorInfo : public Command {
@@ -280,7 +299,7 @@ namespace mongo {
help << " example: { cursorInfo : 1 }";
}
virtual LockType locktype() const { return NONE; }
- bool run(const string&, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
+ bool run(const string&, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ) {
cursorCache.appendInfo( result );
if ( jsobj["setTimeout"].isNumber() )
CursorCache::TIMEOUT = jsobj["setTimeout"].numberLong();