summaryrefslogtreecommitdiff
path: root/db/clientcursor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/clientcursor.cpp')
-rw-r--r--db/clientcursor.cpp310
1 files changed, 216 insertions, 94 deletions
diff --git a/db/clientcursor.cpp b/db/clientcursor.cpp
index 23ef529..bc09457 100644
--- a/db/clientcursor.cpp
+++ b/db/clientcursor.cpp
@@ -32,18 +32,18 @@
namespace mongo {
- typedef multimap<DiskLoc, ClientCursor*> CCByLoc;
-
CCById ClientCursor::clientCursorsById;
boost::recursive_mutex ClientCursor::ccmutex;
long long ClientCursor::numberTimedOut = 0;
- /*static*/ void ClientCursor::assertNoCursors() {
+ void aboutToDeleteForSharding( const Database* db , const DiskLoc& dl ); // from s/d_logic.h
+
+ /*static*/ void ClientCursor::assertNoCursors() {
recursive_scoped_lock lock(ccmutex);
- if( clientCursorsById.size() ) {
+ if( clientCursorsById.size() ) {
log() << "ERROR clientcursors exist but should not at this point" << endl;
ClientCursor *cc = clientCursorsById.begin()->second;
- log() << "first one: " << cc->cursorid << ' ' << cc->ns << endl;
+ log() << "first one: " << cc->_cursorid << ' ' << cc->_ns << endl;
clientCursorsById.clear();
assert(false);
}
@@ -51,18 +51,19 @@ namespace mongo {
void ClientCursor::setLastLoc_inlock(DiskLoc L) {
+ assert( _pos != -2 ); // defensive - see ~ClientCursor
+
if ( L == _lastLoc )
return;
CCByLoc& bl = byLoc();
+
if ( !_lastLoc.isNull() ) {
- CCByLoc::iterator i = kv_find(bl, _lastLoc, this);
- if ( i != bl.end() )
- bl.erase(i);
+ bl.erase( ByLocKey( _lastLoc, _cursorid ) );
}
if ( !L.isNull() )
- bl.insert( make_pair(L, this) );
+ bl[ByLocKey(L,_cursorid)] = this;
_lastLoc = L;
}
@@ -74,8 +75,8 @@ namespace mongo {
/* todo: this implementation is incomplete. we use it as a prefix for dropDatabase, which
works fine as the prefix will end with '.'. however, when used with drop and
- dropIndexes, this could take out cursors that belong to something else -- if you
- drop "foo", currently, this will kill cursors for "foobar".
+ dropIndexes, this could take out cursors that belong to something else -- if you
+ drop "foo", currently, this will kill cursors for "foobar".
*/
void ClientCursor::invalidate(const char *nsPrefix) {
vector<ClientCursor*> toDelete;
@@ -84,6 +85,7 @@ namespace mongo {
assert( len > 0 && strchr(nsPrefix, '.') );
{
+ //cout << "\nTEMP invalidate " << nsPrefix << endl;
recursive_scoped_lock lock(ccmutex);
Database *db = cc().database();
@@ -92,18 +94,18 @@ namespace mongo {
for( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ++i ) {
ClientCursor *cc = i->second;
- if( cc->_db != db )
+ if( cc->_db != db )
continue;
- if ( strncmp(nsPrefix, cc->ns.c_str(), len) == 0 ) {
+ if ( strncmp(nsPrefix, cc->_ns.c_str(), len) == 0 ) {
toDelete.push_back(i->second);
}
}
/*
note : we can't iterate byloc because clientcursors may exist with a loc of null in which case
- they are not in the map. perhaps they should not exist though in the future? something to
+ they are not in the map. perhaps they should not exist though in the future? something to
change???
-
+
CCByLoc& bl = db->ccByLoc;
for ( CCByLoc::iterator i = bl.begin(); i != bl.end(); ++i ) {
ClientCursor *cc = i->second;
@@ -115,10 +117,16 @@ namespace mongo {
for ( vector<ClientCursor*>::iterator i = toDelete.begin(); i != toDelete.end(); ++i )
delete (*i);
+
+ /*cout << "TEMP after invalidate " << endl;
+ for( auto i = clientCursorsById.begin(); i != clientCursorsById.end(); ++i ) {
+ cout << " " << i->second->ns << endl;
+ }
+ cout << "TEMP after invalidate done" << endl;*/
}
}
- bool ClientCursor::shouldTimeout( unsigned millis ){
+ bool ClientCursor::shouldTimeout( unsigned millis ) {
_idleAgeMillis += millis;
return _idleAgeMillis > 600000 && _pinValue == 0;
}
@@ -130,9 +138,9 @@ namespace mongo {
for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) {
CCById::iterator j = i;
i++;
- if( j->second->shouldTimeout( millis ) ){
+ if( j->second->shouldTimeout( millis ) ) {
numberTimedOut++;
- log(1) << "killing old cursor " << j->second->cursorid << ' ' << j->second->ns
+ log(1) << "killing old cursor " << j->second->_cursorid << ' ' << j->second->_ns
<< " idle:" << j->second->idleTime() << "ms\n";
delete j->second;
}
@@ -150,10 +158,10 @@ namespace mongo {
log() << "perf warning: byLoc.size=" << bl.size() << " in aboutToDeleteBucket\n";
}
for ( CCByLoc::iterator i = bl.begin(); i != bl.end(); i++ )
- i->second->c->aboutToDeleteBucket(b);
+ i->second->_c->aboutToDeleteBucket(b);
}
void aboutToDeleteBucket(const DiskLoc& b) {
- ClientCursor::informAboutToDeleteBucket(b);
+ ClientCursor::informAboutToDeleteBucket(b);
}
/* must call this on a delete so we clean up the cursors. */
@@ -162,9 +170,12 @@ namespace mongo {
Database *db = cc().database();
assert(db);
+
+ aboutToDeleteForSharding( db , dl );
+
CCByLoc& bl = db->ccByLoc;
- CCByLoc::iterator j = bl.lower_bound(dl);
- CCByLoc::iterator stop = bl.upper_bound(dl);
+ CCByLoc::iterator j = bl.lower_bound(ByLocKey::min(dl));
+ CCByLoc::iterator stop = bl.upper_bound(ByLocKey::max(dl));
if ( j == stop )
return;
@@ -172,26 +183,45 @@ namespace mongo {
while ( 1 ) {
toAdvance.push_back(j->second);
- DEV assert( j->first == dl );
+ DEV assert( j->first.loc == dl );
++j;
if ( j == stop )
break;
}
- wassert( toAdvance.size() < 5000 );
-
- for ( vector<ClientCursor*>::iterator i = toAdvance.begin(); i != toAdvance.end(); ++i ){
+ if( toAdvance.size() >= 3000 ) {
+ log() << "perf warning MPW101: " << toAdvance.size() << " cursors for one diskloc "
+ << dl.toString()
+ << ' ' << toAdvance[1000]->_ns
+ << ' ' << toAdvance[2000]->_ns
+ << ' ' << toAdvance[1000]->_pinValue
+ << ' ' << toAdvance[2000]->_pinValue
+ << ' ' << toAdvance[1000]->_pos
+ << ' ' << toAdvance[2000]->_pos
+ << ' ' << toAdvance[1000]->_idleAgeMillis
+ << ' ' << toAdvance[2000]->_idleAgeMillis
+ << ' ' << toAdvance[1000]->_doingDeletes
+ << ' ' << toAdvance[2000]->_doingDeletes
+ << endl;
+ //wassert( toAdvance.size() < 5000 );
+ }
+
+ for ( vector<ClientCursor*>::iterator i = toAdvance.begin(); i != toAdvance.end(); ++i ) {
ClientCursor* cc = *i;
wassert(cc->_db == db);
-
+
if ( cc->_doingDeletes ) continue;
- Cursor *c = cc->c.get();
- if ( c->capped() ){
+ Cursor *c = cc->_c.get();
+ if ( c->capped() ) {
+ /* note we cannot advance here. if this condition occurs, writes to the oplog
+ have "caught" the reader. skipping ahead, the reader would miss postentially
+ important data.
+ */
delete cc;
continue;
}
-
+
c->checkLocation();
DiskLoc tmp1 = c->refLoc();
if ( tmp1 != dl ) {
@@ -213,53 +243,131 @@ namespace mongo {
}
void aboutToDelete(const DiskLoc& dl) { ClientCursor::aboutToDelete(dl); }
+ ClientCursor::ClientCursor(int queryOptions, const shared_ptr<Cursor>& c, const string& ns, BSONObj query ) :
+ _ns(ns), _db( cc().database() ),
+ _c(c), _pos(0),
+ _query(query), _queryOptions(queryOptions),
+ _idleAgeMillis(0), _pinValue(0),
+ _doingDeletes(false), _yieldSometimesTracker(128,10) {
+ assert( _db );
+ assert( str::startsWith(_ns, _db->name) );
+ if( queryOptions & QueryOption_NoCursorTimeout )
+ noTimeout();
+ recursive_scoped_lock lock(ccmutex);
+ _cursorid = allocCursorId_inlock();
+ clientCursorsById.insert( make_pair(_cursorid, this) );
+
+ if ( ! _c->modifiedKeys() ) {
+ // store index information so we can decide if we can
+ // get something out of the index key rather than full object
+
+ int x = 0;
+ BSONObjIterator i( _c->indexKeyPattern() );
+ while ( i.more() ) {
+ BSONElement e = i.next();
+ if ( e.isNumber() ) {
+ // only want basic index fields, not "2d" etc
+ _indexedFields[e.fieldName()] = x;
+ }
+ x++;
+ }
+ }
+
+ }
+
+
ClientCursor::~ClientCursor() {
- assert( pos != -2 );
+ assert( _pos != -2 );
{
recursive_scoped_lock lock(ccmutex);
setLastLoc_inlock( DiskLoc() ); // removes us from bylocation multimap
- clientCursorsById.erase(cursorid);
+ clientCursorsById.erase(_cursorid);
// defensive:
- (CursorId&) cursorid = -1;
- pos = -2;
+ (CursorId&)_cursorid = -1;
+ _pos = -2;
+ }
+ }
+
+ bool ClientCursor::getFieldsDotted( const string& name, BSONElementSet &ret ) {
+
+ map<string,int>::const_iterator i = _indexedFields.find( name );
+ if ( i == _indexedFields.end() ) {
+ current().getFieldsDotted( name , ret );
+ return false;
+ }
+
+ int x = i->second;
+
+ BSONObjIterator it( currKey() );
+ while ( x && it.more() ) {
+ it.next();
+ x--;
}
+ assert( x == 0 );
+ ret.insert( it.next() );
+ return true;
+ }
+
+ BSONElement ClientCursor::getFieldDotted( const string& name , bool * fromKey ) {
+
+ map<string,int>::const_iterator i = _indexedFields.find( name );
+ if ( i == _indexedFields.end() ) {
+ if ( fromKey )
+ *fromKey = false;
+ return current().getFieldDotted( name );
+ }
+
+ int x = i->second;
+
+ BSONObjIterator it( currKey() );
+ while ( x && it.more() ) {
+ it.next();
+ x--;
+ }
+ assert( x == 0 );
+
+ if ( fromKey )
+ *fromKey = true;
+ return it.next();
}
+
/* call when cursor's location changes so that we can update the
cursorsbylocation map. if you are locked and internally iterating, only
need to call when you are ready to "unlock".
*/
void ClientCursor::updateLocation() {
- assert( cursorid );
+ assert( _cursorid );
_idleAgeMillis = 0;
- DiskLoc cl = c->refLoc();
+ DiskLoc cl = _c->refLoc();
if ( lastLoc() == cl ) {
//log() << "info: lastloc==curloc " << ns << '\n';
- } else {
+ }
+ else {
recursive_scoped_lock lock(ccmutex);
setLastLoc_inlock(cl);
}
// may be necessary for MultiCursor even when cl hasn't changed
- c->noteLocation();
+ _c->noteLocation();
}
-
+
int ClientCursor::yieldSuggest() {
int writers = 0;
int readers = 0;
-
+
int micros = Client::recommendedYieldMicros( &writers , &readers );
-
- if ( micros > 0 && writers == 0 && dbMutex.getState() <= 0 ){
+
+ if ( micros > 0 && writers == 0 && dbMutex.getState() <= 0 ) {
// we have a read lock, and only reads are coming on, so why bother unlocking
micros = 0;
}
-
+
return micros;
}
-
- bool ClientCursor::yieldSometimes(){
+
+ bool ClientCursor::yieldSometimes() {
if ( ! _yieldSometimesTracker.ping() )
return true;
@@ -267,82 +375,83 @@ namespace mongo {
return ( micros > 0 ) ? yield( micros ) : true;
}
- void ClientCursor::staticYield( int micros ) {
+ void ClientCursor::staticYield( int micros , const StringData& ns ) {
+ killCurrentOp.checkForInterrupt( false );
{
dbtempreleasecond unlock;
- if ( unlock.unlocked() ){
+ if ( unlock.unlocked() ) {
if ( micros == -1 )
micros = Client::recommendedYieldMicros();
if ( micros > 0 )
- sleepmicros( micros );
+ sleepmicros( micros );
}
else {
- log( LL_WARNING ) << "ClientCursor::yield can't unlock b/c of recursive lock" << endl;
+ warning() << "ClientCursor::yield can't unlock b/c of recursive lock ns: " << ns << endl;
}
- }
+ }
}
-
+
bool ClientCursor::prepareToYield( YieldData &data ) {
- if ( ! c->supportYields() )
+ if ( ! _c->supportYields() )
return false;
// need to store in case 'this' gets deleted
- data._id = cursorid;
-
+ data._id = _cursorid;
+
data._doingDeletes = _doingDeletes;
_doingDeletes = false;
-
+
updateLocation();
-
+
{
- /* a quick test that our temprelease is safe.
- todo: make a YieldingCursor class
+ /* a quick test that our temprelease is safe.
+ todo: make a YieldingCursor class
and then make the following code part of a unit test.
*/
const int test = 0;
static bool inEmpty = false;
- if( test && !inEmpty ) {
+ if( test && !inEmpty ) {
inEmpty = true;
log() << "TEST: manipulate collection during cc:yield" << endl;
- if( test == 1 )
- Helpers::emptyCollection(ns.c_str());
+ if( test == 1 )
+ Helpers::emptyCollection(_ns.c_str());
else if( test == 2 ) {
BSONObjBuilder b; string m;
- dropCollection(ns.c_str(), m, b);
+ dropCollection(_ns.c_str(), m, b);
}
- else {
- dropDatabase(ns.c_str());
+ else {
+ dropDatabase(_ns.c_str());
}
}
- }
+ }
return true;
}
-
+
bool ClientCursor::recoverFromYield( const YieldData &data ) {
ClientCursor *cc = ClientCursor::find( data._id , false );
- if ( cc == 0 ){
+ if ( cc == 0 ) {
// id was deleted
return false;
}
-
+
cc->_doingDeletes = data._doingDeletes;
- cc->c->checkLocation();
- return true;
+ cc->_c->checkLocation();
+ return true;
}
-
+
bool ClientCursor::yield( int micros ) {
- if ( ! c->supportYields() )
+ if ( ! _c->supportYields() )
return true;
- YieldData data;
+ YieldData data;
prepareToYield( data );
-
- staticYield( micros );
+
+ staticYield( micros , _ns );
return ClientCursor::recoverFromYield( data );
}
int ctmLast = 0; // so we don't have to do find() which is a little slow very often.
long long ClientCursor::allocCursorId_inlock() {
- if( 0 ) {
+ if( 0 ) {
static long long z;
++z;
cout << "TEMP alloccursorid " << z << endl;
@@ -362,32 +471,32 @@ namespace mongo {
return x;
}
- void ClientCursor::storeOpForSlave( DiskLoc last ){
+ void ClientCursor::storeOpForSlave( DiskLoc last ) {
if ( ! ( _queryOptions & QueryOption_OplogReplay ))
return;
if ( last.isNull() )
return;
-
+
BSONElement e = last.obj()["ts"];
if ( e.type() == Date || e.type() == Timestamp )
_slaveReadTill = e._opTime();
}
-
- void ClientCursor::updateSlaveLocation( CurOp& curop ){
+
+ void ClientCursor::updateSlaveLocation( CurOp& curop ) {
if ( _slaveReadTill.isNull() )
return;
- mongo::updateSlaveLocation( curop , ns.c_str() , _slaveReadTill );
+ mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill );
}
- void ClientCursor::appendStats( BSONObjBuilder& result ){
+ void ClientCursor::appendStats( BSONObjBuilder& result ) {
recursive_scoped_lock lock(ccmutex);
- result.appendNumber("totalOpen", (int)clientCursorsById.size() );
+ result.appendNumber("totalOpen", clientCursorsById.size() );
result.appendNumber("clientCursors_size", (int) numCursors());
- result.appendNumber("timedOut" , (int)numberTimedOut);
+ result.appendNumber("timedOut" , numberTimedOut);
}
-
+
// QUESTION: Restrict to the namespace from which this command was issued?
// Alternatively, make this command admin-only?
class CmdCursorInfo : public Command {
@@ -398,19 +507,19 @@ namespace mongo {
help << " example: { cursorInfo : 1 }";
}
virtual LockType locktype() const { return NONE; }
- bool run(const string& dbname, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
+ bool run(const string& dbname, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ) {
ClientCursor::appendStats( result );
return true;
}
} cmdCursorInfo;
-
- void ClientCursorMonitor::run(){
+
+ void ClientCursorMonitor::run() {
Client::initThread("clientcursormon");
Client& client = cc();
-
+
unsigned old = curTimeMillis();
- while ( ! inShutdown() ){
+ while ( ! inShutdown() ) {
unsigned now = curTimeMillis();
ClientCursor::idleTimeReport( now - old );
old = now;
@@ -420,15 +529,28 @@ namespace mongo {
client.shutdown();
}
- void ClientCursor::find( const string& ns , set<CursorId>& all ){
+ void ClientCursor::find( const string& ns , set<CursorId>& all ) {
recursive_scoped_lock lock(ccmutex);
-
- for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ){
- if ( i->second->ns == ns )
+
+ for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) {
+ if ( i->second->_ns == ns )
all.insert( i->first );
}
}
+ int ClientCursor::erase(int n, long long *ids) {
+ int found = 0;
+ for ( int i = 0; i < n; i++ ) {
+ if ( erase(ids[i]) )
+ found++;
+
+ if ( inShutdown() )
+ break;
+ }
+ return found;
+
+ }
+
ClientCursorMonitor clientCursorMonitor;