diff options
Diffstat (limited to 'db/clientcursor.cpp')
-rw-r--r-- | db/clientcursor.cpp | 310 |
1 files changed, 216 insertions, 94 deletions
diff --git a/db/clientcursor.cpp b/db/clientcursor.cpp index 23ef529..bc09457 100644 --- a/db/clientcursor.cpp +++ b/db/clientcursor.cpp @@ -32,18 +32,18 @@ namespace mongo { - typedef multimap<DiskLoc, ClientCursor*> CCByLoc; - CCById ClientCursor::clientCursorsById; boost::recursive_mutex ClientCursor::ccmutex; long long ClientCursor::numberTimedOut = 0; - /*static*/ void ClientCursor::assertNoCursors() { + void aboutToDeleteForSharding( const Database* db , const DiskLoc& dl ); // from s/d_logic.h + + /*static*/ void ClientCursor::assertNoCursors() { recursive_scoped_lock lock(ccmutex); - if( clientCursorsById.size() ) { + if( clientCursorsById.size() ) { log() << "ERROR clientcursors exist but should not at this point" << endl; ClientCursor *cc = clientCursorsById.begin()->second; - log() << "first one: " << cc->cursorid << ' ' << cc->ns << endl; + log() << "first one: " << cc->_cursorid << ' ' << cc->_ns << endl; clientCursorsById.clear(); assert(false); } @@ -51,18 +51,19 @@ namespace mongo { void ClientCursor::setLastLoc_inlock(DiskLoc L) { + assert( _pos != -2 ); // defensive - see ~ClientCursor + if ( L == _lastLoc ) return; CCByLoc& bl = byLoc(); + if ( !_lastLoc.isNull() ) { - CCByLoc::iterator i = kv_find(bl, _lastLoc, this); - if ( i != bl.end() ) - bl.erase(i); + bl.erase( ByLocKey( _lastLoc, _cursorid ) ); } if ( !L.isNull() ) - bl.insert( make_pair(L, this) ); + bl[ByLocKey(L,_cursorid)] = this; _lastLoc = L; } @@ -74,8 +75,8 @@ namespace mongo { /* todo: this implementation is incomplete. we use it as a prefix for dropDatabase, which works fine as the prefix will end with '.'. however, when used with drop and - dropIndexes, this could take out cursors that belong to something else -- if you - drop "foo", currently, this will kill cursors for "foobar". + dropIndexes, this could take out cursors that belong to something else -- if you + drop "foo", currently, this will kill cursors for "foobar". */ void ClientCursor::invalidate(const char *nsPrefix) { vector<ClientCursor*> toDelete; @@ -84,6 +85,7 @@ namespace mongo { assert( len > 0 && strchr(nsPrefix, '.') ); { + //cout << "\nTEMP invalidate " << nsPrefix << endl; recursive_scoped_lock lock(ccmutex); Database *db = cc().database(); @@ -92,18 +94,18 @@ namespace mongo { for( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ++i ) { ClientCursor *cc = i->second; - if( cc->_db != db ) + if( cc->_db != db ) continue; - if ( strncmp(nsPrefix, cc->ns.c_str(), len) == 0 ) { + if ( strncmp(nsPrefix, cc->_ns.c_str(), len) == 0 ) { toDelete.push_back(i->second); } } /* note : we can't iterate byloc because clientcursors may exist with a loc of null in which case - they are not in the map. perhaps they should not exist though in the future? something to + they are not in the map. perhaps they should not exist though in the future? something to change??? - + CCByLoc& bl = db->ccByLoc; for ( CCByLoc::iterator i = bl.begin(); i != bl.end(); ++i ) { ClientCursor *cc = i->second; @@ -115,10 +117,16 @@ namespace mongo { for ( vector<ClientCursor*>::iterator i = toDelete.begin(); i != toDelete.end(); ++i ) delete (*i); + + /*cout << "TEMP after invalidate " << endl; + for( auto i = clientCursorsById.begin(); i != clientCursorsById.end(); ++i ) { + cout << " " << i->second->ns << endl; + } + cout << "TEMP after invalidate done" << endl;*/ } } - bool ClientCursor::shouldTimeout( unsigned millis ){ + bool ClientCursor::shouldTimeout( unsigned millis ) { _idleAgeMillis += millis; return _idleAgeMillis > 600000 && _pinValue == 0; } @@ -130,9 +138,9 @@ namespace mongo { for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) { CCById::iterator j = i; i++; - if( j->second->shouldTimeout( millis ) ){ + if( j->second->shouldTimeout( millis ) ) { numberTimedOut++; - log(1) << "killing old cursor " << j->second->cursorid << ' ' << j->second->ns + log(1) << "killing old cursor " << j->second->_cursorid << ' ' << j->second->_ns << " idle:" << j->second->idleTime() << "ms\n"; delete j->second; } @@ -150,10 +158,10 @@ namespace mongo { log() << "perf warning: byLoc.size=" << bl.size() << " in aboutToDeleteBucket\n"; } for ( CCByLoc::iterator i = bl.begin(); i != bl.end(); i++ ) - i->second->c->aboutToDeleteBucket(b); + i->second->_c->aboutToDeleteBucket(b); } void aboutToDeleteBucket(const DiskLoc& b) { - ClientCursor::informAboutToDeleteBucket(b); + ClientCursor::informAboutToDeleteBucket(b); } /* must call this on a delete so we clean up the cursors. */ @@ -162,9 +170,12 @@ namespace mongo { Database *db = cc().database(); assert(db); + + aboutToDeleteForSharding( db , dl ); + CCByLoc& bl = db->ccByLoc; - CCByLoc::iterator j = bl.lower_bound(dl); - CCByLoc::iterator stop = bl.upper_bound(dl); + CCByLoc::iterator j = bl.lower_bound(ByLocKey::min(dl)); + CCByLoc::iterator stop = bl.upper_bound(ByLocKey::max(dl)); if ( j == stop ) return; @@ -172,26 +183,45 @@ namespace mongo { while ( 1 ) { toAdvance.push_back(j->second); - DEV assert( j->first == dl ); + DEV assert( j->first.loc == dl ); ++j; if ( j == stop ) break; } - wassert( toAdvance.size() < 5000 ); - - for ( vector<ClientCursor*>::iterator i = toAdvance.begin(); i != toAdvance.end(); ++i ){ + if( toAdvance.size() >= 3000 ) { + log() << "perf warning MPW101: " << toAdvance.size() << " cursors for one diskloc " + << dl.toString() + << ' ' << toAdvance[1000]->_ns + << ' ' << toAdvance[2000]->_ns + << ' ' << toAdvance[1000]->_pinValue + << ' ' << toAdvance[2000]->_pinValue + << ' ' << toAdvance[1000]->_pos + << ' ' << toAdvance[2000]->_pos + << ' ' << toAdvance[1000]->_idleAgeMillis + << ' ' << toAdvance[2000]->_idleAgeMillis + << ' ' << toAdvance[1000]->_doingDeletes + << ' ' << toAdvance[2000]->_doingDeletes + << endl; + //wassert( toAdvance.size() < 5000 ); + } + + for ( vector<ClientCursor*>::iterator i = toAdvance.begin(); i != toAdvance.end(); ++i ) { ClientCursor* cc = *i; wassert(cc->_db == db); - + if ( cc->_doingDeletes ) continue; - Cursor *c = cc->c.get(); - if ( c->capped() ){ + Cursor *c = cc->_c.get(); + if ( c->capped() ) { + /* note we cannot advance here. if this condition occurs, writes to the oplog + have "caught" the reader. skipping ahead, the reader would miss postentially + important data. + */ delete cc; continue; } - + c->checkLocation(); DiskLoc tmp1 = c->refLoc(); if ( tmp1 != dl ) { @@ -213,53 +243,131 @@ namespace mongo { } void aboutToDelete(const DiskLoc& dl) { ClientCursor::aboutToDelete(dl); } + ClientCursor::ClientCursor(int queryOptions, const shared_ptr<Cursor>& c, const string& ns, BSONObj query ) : + _ns(ns), _db( cc().database() ), + _c(c), _pos(0), + _query(query), _queryOptions(queryOptions), + _idleAgeMillis(0), _pinValue(0), + _doingDeletes(false), _yieldSometimesTracker(128,10) { + assert( _db ); + assert( str::startsWith(_ns, _db->name) ); + if( queryOptions & QueryOption_NoCursorTimeout ) + noTimeout(); + recursive_scoped_lock lock(ccmutex); + _cursorid = allocCursorId_inlock(); + clientCursorsById.insert( make_pair(_cursorid, this) ); + + if ( ! _c->modifiedKeys() ) { + // store index information so we can decide if we can + // get something out of the index key rather than full object + + int x = 0; + BSONObjIterator i( _c->indexKeyPattern() ); + while ( i.more() ) { + BSONElement e = i.next(); + if ( e.isNumber() ) { + // only want basic index fields, not "2d" etc + _indexedFields[e.fieldName()] = x; + } + x++; + } + } + + } + + ClientCursor::~ClientCursor() { - assert( pos != -2 ); + assert( _pos != -2 ); { recursive_scoped_lock lock(ccmutex); setLastLoc_inlock( DiskLoc() ); // removes us from bylocation multimap - clientCursorsById.erase(cursorid); + clientCursorsById.erase(_cursorid); // defensive: - (CursorId&) cursorid = -1; - pos = -2; + (CursorId&)_cursorid = -1; + _pos = -2; + } + } + + bool ClientCursor::getFieldsDotted( const string& name, BSONElementSet &ret ) { + + map<string,int>::const_iterator i = _indexedFields.find( name ); + if ( i == _indexedFields.end() ) { + current().getFieldsDotted( name , ret ); + return false; + } + + int x = i->second; + + BSONObjIterator it( currKey() ); + while ( x && it.more() ) { + it.next(); + x--; } + assert( x == 0 ); + ret.insert( it.next() ); + return true; + } + + BSONElement ClientCursor::getFieldDotted( const string& name , bool * fromKey ) { + + map<string,int>::const_iterator i = _indexedFields.find( name ); + if ( i == _indexedFields.end() ) { + if ( fromKey ) + *fromKey = false; + return current().getFieldDotted( name ); + } + + int x = i->second; + + BSONObjIterator it( currKey() ); + while ( x && it.more() ) { + it.next(); + x--; + } + assert( x == 0 ); + + if ( fromKey ) + *fromKey = true; + return it.next(); } + /* call when cursor's location changes so that we can update the cursorsbylocation map. if you are locked and internally iterating, only need to call when you are ready to "unlock". */ void ClientCursor::updateLocation() { - assert( cursorid ); + assert( _cursorid ); _idleAgeMillis = 0; - DiskLoc cl = c->refLoc(); + DiskLoc cl = _c->refLoc(); if ( lastLoc() == cl ) { //log() << "info: lastloc==curloc " << ns << '\n'; - } else { + } + else { recursive_scoped_lock lock(ccmutex); setLastLoc_inlock(cl); } // may be necessary for MultiCursor even when cl hasn't changed - c->noteLocation(); + _c->noteLocation(); } - + int ClientCursor::yieldSuggest() { int writers = 0; int readers = 0; - + int micros = Client::recommendedYieldMicros( &writers , &readers ); - - if ( micros > 0 && writers == 0 && dbMutex.getState() <= 0 ){ + + if ( micros > 0 && writers == 0 && dbMutex.getState() <= 0 ) { // we have a read lock, and only reads are coming on, so why bother unlocking micros = 0; } - + return micros; } - - bool ClientCursor::yieldSometimes(){ + + bool ClientCursor::yieldSometimes() { if ( ! _yieldSometimesTracker.ping() ) return true; @@ -267,82 +375,83 @@ namespace mongo { return ( micros > 0 ) ? yield( micros ) : true; } - void ClientCursor::staticYield( int micros ) { + void ClientCursor::staticYield( int micros , const StringData& ns ) { + killCurrentOp.checkForInterrupt( false ); { dbtempreleasecond unlock; - if ( unlock.unlocked() ){ + if ( unlock.unlocked() ) { if ( micros == -1 ) micros = Client::recommendedYieldMicros(); if ( micros > 0 ) - sleepmicros( micros ); + sleepmicros( micros ); } else { - log( LL_WARNING ) << "ClientCursor::yield can't unlock b/c of recursive lock" << endl; + warning() << "ClientCursor::yield can't unlock b/c of recursive lock ns: " << ns << endl; } - } + } } - + bool ClientCursor::prepareToYield( YieldData &data ) { - if ( ! c->supportYields() ) + if ( ! _c->supportYields() ) return false; // need to store in case 'this' gets deleted - data._id = cursorid; - + data._id = _cursorid; + data._doingDeletes = _doingDeletes; _doingDeletes = false; - + updateLocation(); - + { - /* a quick test that our temprelease is safe. - todo: make a YieldingCursor class + /* a quick test that our temprelease is safe. + todo: make a YieldingCursor class and then make the following code part of a unit test. */ const int test = 0; static bool inEmpty = false; - if( test && !inEmpty ) { + if( test && !inEmpty ) { inEmpty = true; log() << "TEST: manipulate collection during cc:yield" << endl; - if( test == 1 ) - Helpers::emptyCollection(ns.c_str()); + if( test == 1 ) + Helpers::emptyCollection(_ns.c_str()); else if( test == 2 ) { BSONObjBuilder b; string m; - dropCollection(ns.c_str(), m, b); + dropCollection(_ns.c_str(), m, b); } - else { - dropDatabase(ns.c_str()); + else { + dropDatabase(_ns.c_str()); } } - } + } return true; } - + bool ClientCursor::recoverFromYield( const YieldData &data ) { ClientCursor *cc = ClientCursor::find( data._id , false ); - if ( cc == 0 ){ + if ( cc == 0 ) { // id was deleted return false; } - + cc->_doingDeletes = data._doingDeletes; - cc->c->checkLocation(); - return true; + cc->_c->checkLocation(); + return true; } - + bool ClientCursor::yield( int micros ) { - if ( ! c->supportYields() ) + if ( ! _c->supportYields() ) return true; - YieldData data; + YieldData data; prepareToYield( data ); - - staticYield( micros ); + + staticYield( micros , _ns ); return ClientCursor::recoverFromYield( data ); } int ctmLast = 0; // so we don't have to do find() which is a little slow very often. long long ClientCursor::allocCursorId_inlock() { - if( 0 ) { + if( 0 ) { static long long z; ++z; cout << "TEMP alloccursorid " << z << endl; @@ -362,32 +471,32 @@ namespace mongo { return x; } - void ClientCursor::storeOpForSlave( DiskLoc last ){ + void ClientCursor::storeOpForSlave( DiskLoc last ) { if ( ! ( _queryOptions & QueryOption_OplogReplay )) return; if ( last.isNull() ) return; - + BSONElement e = last.obj()["ts"]; if ( e.type() == Date || e.type() == Timestamp ) _slaveReadTill = e._opTime(); } - - void ClientCursor::updateSlaveLocation( CurOp& curop ){ + + void ClientCursor::updateSlaveLocation( CurOp& curop ) { if ( _slaveReadTill.isNull() ) return; - mongo::updateSlaveLocation( curop , ns.c_str() , _slaveReadTill ); + mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill ); } - void ClientCursor::appendStats( BSONObjBuilder& result ){ + void ClientCursor::appendStats( BSONObjBuilder& result ) { recursive_scoped_lock lock(ccmutex); - result.appendNumber("totalOpen", (int)clientCursorsById.size() ); + result.appendNumber("totalOpen", clientCursorsById.size() ); result.appendNumber("clientCursors_size", (int) numCursors()); - result.appendNumber("timedOut" , (int)numberTimedOut); + result.appendNumber("timedOut" , numberTimedOut); } - + // QUESTION: Restrict to the namespace from which this command was issued? // Alternatively, make this command admin-only? class CmdCursorInfo : public Command { @@ -398,19 +507,19 @@ namespace mongo { help << " example: { cursorInfo : 1 }"; } virtual LockType locktype() const { return NONE; } - bool run(const string& dbname, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){ + bool run(const string& dbname, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { ClientCursor::appendStats( result ); return true; } } cmdCursorInfo; - - void ClientCursorMonitor::run(){ + + void ClientCursorMonitor::run() { Client::initThread("clientcursormon"); Client& client = cc(); - + unsigned old = curTimeMillis(); - while ( ! inShutdown() ){ + while ( ! inShutdown() ) { unsigned now = curTimeMillis(); ClientCursor::idleTimeReport( now - old ); old = now; @@ -420,15 +529,28 @@ namespace mongo { client.shutdown(); } - void ClientCursor::find( const string& ns , set<CursorId>& all ){ + void ClientCursor::find( const string& ns , set<CursorId>& all ) { recursive_scoped_lock lock(ccmutex); - - for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ){ - if ( i->second->ns == ns ) + + for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) { + if ( i->second->_ns == ns ) all.insert( i->first ); } } + int ClientCursor::erase(int n, long long *ids) { + int found = 0; + for ( int i = 0; i < n; i++ ) { + if ( erase(ids[i]) ) + found++; + + if ( inShutdown() ) + break; + } + return found; + + } + ClientCursorMonitor clientCursorMonitor; |