diff options
Diffstat (limited to 'db/curop.h')
-rw-r--r-- | db/curop.h | 358 |
1 files changed, 208 insertions, 150 deletions
@@ -1,4 +1,5 @@ -// curop.h +// @file curop.h + /* * Copyright (C) 2010 10gen Inc. * @@ -18,152 +19,188 @@ #pragma once -#include "namespace.h" +#include "namespace-inl.h" #include "client.h" #include "../bson/util/atomic_int.h" +#include "../util/concurrency/spin_lock.h" +#include "../util/time_support.h" #include "db.h" +#include "../scripting/engine.h" -namespace mongo { +namespace mongo { /* lifespan is different than CurOp because of recursives with DBDirectClient */ class OpDebug { public: StringBuilder str; - - void reset(){ - str.reset(); - } + void reset() { str.reset(); } }; - - /* Current operation (for the current Client). - an embedded member of Client class, and typically used from within the mutex there. */ - class CurOp : boost::noncopyable { - static AtomicUInt _nextOpNum; + + /** + * stores a copy of a bson obj in a fixed size buffer + * if its too big for the buffer, says "too big" + * useful for keeping a copy around indefinitely without wasting a lot of space or doing malloc + */ + class CachedBSONObj { + public: + enum { TOO_BIG_SENTINEL = 1 } ; static BSONObj _tooBig; // { $msg : "query not recording (too large)" } - - Client * _client; - CurOp * _wrapped; - unsigned long long _start; - unsigned long long _checkpoint; - unsigned long long _end; + CachedBSONObj() { + _size = (int*)_buf; + reset(); + } - bool _active; - int _op; - bool _command; - int _lockType; // see concurrency.h for values - bool _waitingForLock; - int _dbprofile; // 0=off, 1=slow, 2=all - AtomicUInt _opNum; - char _ns[Namespace::MaxNsLen+2]; - struct SockAddr _remote; - char _queryBuf[256]; - - void resetQuery(int x=0) { *((int *)_queryBuf) = x; } - - OpDebug _debug; - - ThreadSafeString _message; - ProgressMeter _progressMeter; + void reset( int sz = 0 ) { + _lock.lock(); + _reset( sz ); + _lock.unlock(); + } + + void set( const BSONObj& o ) { + _lock.lock(); + try { + int sz = o.objsize(); + + if ( sz > (int) sizeof(_buf) ) { + _reset(TOO_BIG_SENTINEL); + } + else { + memcpy(_buf, o.objdata(), sz ); + } + + _lock.unlock(); + } + catch ( ... ) { + _lock.unlock(); + throw; + } - void _reset(){ - _command = false; - _lockType = 0; - _dbprofile = 0; - _end = 0; - _waitingForLock = false; - _message = ""; - _progressMeter.finished(); } - void setNS(const char *ns) { - strncpy(_ns, ns, Namespace::MaxNsLen); + int size() const { return *_size; } + bool have() const { return size() > 0; } + + BSONObj get() { + _lock.lock(); + BSONObj o; + try { + o = _get(); + _lock.unlock(); + } + catch ( ... ) { + _lock.unlock(); + throw; + } + return o; + } + + void append( BSONObjBuilder& b , const StringData& name ) { + _lock.lock(); + try { + BSONObj temp = _get(); + b.append( name , temp ); + _lock.unlock(); + } + catch ( ... ) { + _lock.unlock(); + throw; + } } + private: + /** you have to be locked when you call this */ + BSONObj _get() { + int sz = size(); + if ( sz == 0 ) + return BSONObj(); + if ( sz == TOO_BIG_SENTINEL ) + return _tooBig; + return BSONObj( _buf ).copy(); + } + + /** you have to be locked when you call this */ + void _reset( int sz ) { _size[0] = sz; } + + SpinLock _lock; + int * _size; + char _buf[512]; + }; + + /* Current operation (for the current Client). + an embedded member of Client class, and typically used from within the mutex there. + */ + class CurOp : boost::noncopyable { public: - - int querySize() const { return *((int *) _queryBuf); } - bool haveQuery() const { return querySize() != 0; } + CurOp( Client * client , CurOp * wrapped = 0 ); + ~CurOp(); - BSONObj query( bool threadSafe = false); + bool haveQuery() const { return _query.have(); } + BSONObj query() { return _query.get(); } - void ensureStarted(){ + void ensureStarted() { if ( _start == 0 ) - _start = _checkpoint = curTimeMicros64(); + _start = _checkpoint = curTimeMicros64(); } - void enter( Client::Context * context ){ + void enter( Client::Context * context ) { ensureStarted(); setNS( context->ns() ); if ( context->_db && context->_db->profile > _dbprofile ) _dbprofile = context->_db->profile; } - void leave( Client::Context * context ){ + void leave( Client::Context * context ) { unsigned long long now = curTimeMicros64(); Top::global.record( _ns , _op , _lockType , now - _checkpoint , _command ); _checkpoint = now; } - void reset(){ + void reset() { _reset(); _start = _checkpoint = 0; _active = true; _opNum = _nextOpNum++; _ns[0] = '?'; // just in case not set later _debug.reset(); - resetQuery(); + _query.reset(); } - + void reset( const SockAddr & remote, int op ) { reset(); _remote = remote; _op = op; } - - void markCommand(){ - _command = true; - } - void waitingForLock( int type ){ + void markCommand() { _command = true; } + + void waitingForLock( int type ) { _waitingForLock = true; if ( type > 0 ) _lockType = 1; else _lockType = -1; } - void gotLock(){ - _waitingForLock = false; - } - - OpDebug& debug(){ - return _debug; - } - - int profileLevel() const { - return _dbprofile; - } - - const char * getNS() const { - return _ns; - } + void gotLock() { _waitingForLock = false; } + OpDebug& debug() { return _debug; } + int profileLevel() const { return _dbprofile; } + const char * getNS() const { return _ns; } bool shouldDBProfile( int ms ) const { if ( _dbprofile <= 0 ) return false; - + return _dbprofile >= 2 || ms >= cmdLine.slowMS; } - + AtomicUInt opNum() const { return _opNum; } /** if this op is running */ bool active() const { return _active; } - + int getLockType() const { return _lockType; } - bool isWaitingForLock() const { return _waitingForLock; } + bool isWaitingForLock() const { return _waitingForLock; } int getOp() const { return _op; } - - + /** micros */ unsigned long long startTime() { ensureStarted(); @@ -174,75 +211,41 @@ namespace mongo { _active = false; _end = curTimeMicros64(); } - + unsigned long long totalTimeMicros() { massert( 12601 , "CurOp not marked done yet" , ! _active ); return _end - startTime(); } - int totalTimeMillis() { - return (int) (totalTimeMicros() / 1000); - } + int totalTimeMillis() { return (int) (totalTimeMicros() / 1000); } int elapsedMillis() { unsigned long long total = curTimeMicros64() - startTime(); return (int) (total / 1000); } - int elapsedSeconds() { - return elapsedMillis() / 1000; - } + int elapsedSeconds() { return elapsedMillis() / 1000; } - void setQuery(const BSONObj& query) { - if( query.objsize() > (int) sizeof(_queryBuf) ) { - resetQuery(1); // flag as too big and return - return; - } - memcpy(_queryBuf, query.objdata(), query.objsize()); - } + void setQuery(const BSONObj& query) { _query.set( query ); } - Client * getClient() const { - return _client; - } + Client * getClient() const { return _client; } - CurOp( Client * client , CurOp * wrapped = 0 ) { - _client = client; - _wrapped = wrapped; - if ( _wrapped ){ - _client->_curOp = this; - } - _start = _checkpoint = 0; - _active = false; - _reset(); - _op = 0; - // These addresses should never be written to again. The zeroes are - // placed here as a precaution because currentOp may be accessed - // without the db mutex. - memset(_ns, 0, sizeof(_ns)); - memset(_queryBuf, 0, sizeof(_queryBuf)); - } - - ~CurOp(); - - BSONObj info() { - if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) { + BSONObj info() { + if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) { BSONObjBuilder b; b.append("err", "unauthorized"); return b.obj(); } return infoNoauth(); } - - BSONObj infoNoauth( int attempt = 0 ); - string getRemoteString( bool includePort = true ){ - return _remote.toString(includePort); - } + BSONObj infoNoauth(); - ProgressMeter& setMessage( const char * msg , long long progressMeterTotal = 0 , int secondsBetween = 3 ){ + string getRemoteString( bool includePort = true ) { return _remote.toString(includePort); } - if ( progressMeterTotal ){ - if ( _progressMeter.isActive() ){ + ProgressMeter& setMessage( const char * msg , unsigned long long progressMeterTotal = 0 , int secondsBetween = 3 ) { + if ( progressMeterTotal ) { + if ( _progressMeter.isActive() ) { cout << "about to assert, old _message: " << _message << " new message:" << msg << endl; assert( ! _progressMeter.isActive() ); } @@ -251,38 +254,93 @@ namespace mongo { else { _progressMeter.finished(); } - + _message = msg; - + return _progressMeter; } - + string getMessage() const { return _message.toString(); } ProgressMeter& getProgressMeter() { return _progressMeter; } - + CurOp *parent() const { return _wrapped; } + void kill() { _killed = true; } + bool killed() const { return _killed; } + void setNS(const char *ns) { + strncpy(_ns, ns, Namespace::MaxNsLen); + _ns[Namespace::MaxNsLen] = 0; + } friend class Client; + + private: + static AtomicUInt _nextOpNum; + Client * _client; + CurOp * _wrapped; + unsigned long long _start; + unsigned long long _checkpoint; + unsigned long long _end; + bool _active; + int _op; + bool _command; + int _lockType; // see concurrency.h for values + bool _waitingForLock; + int _dbprofile; // 0=off, 1=slow, 2=all + AtomicUInt _opNum; + char _ns[Namespace::MaxNsLen+2]; + struct SockAddr _remote; + CachedBSONObj _query; + OpDebug _debug; + ThreadSafeString _message; + ProgressMeter _progressMeter; + volatile bool _killed; + + void _reset() { + _command = false; + _lockType = 0; + _dbprofile = 0; + _end = 0; + _waitingForLock = false; + _message = ""; + _progressMeter.finished(); + _killed = false; + } }; - /* 0 = ok - 1 = kill current operation and reset this to 0 - future: maybe use this as a "going away" thing on process termination with a higher flag value + /* _globalKill: we are shutting down + otherwise kill attribute set on specified CurOp + this class does not handle races between interruptJs and the checkForInterrupt functions - those must be + handled by the client of this class */ - extern class KillCurrentOp { - enum { Off, On, All } state; - AtomicUInt toKill; + extern class KillCurrentOp { public: - void killAll() { state = All; } - void kill(AtomicUInt i) { toKill = i; state = On; } - - void checkForInterrupt() { - if( state != Off ) { - if( state == All ) - uasserted(11600,"interrupted at shutdown"); - if( cc().curop()->opNum() == toKill ) { - state = Off; - uasserted(11601,"interrupted"); - } - } + void killAll(); + void kill(AtomicUInt i); + + /** @return true if global interrupt and should terminate the operation */ + bool globalInterruptCheck() const { return _globalKill; } + + void checkForInterrupt( bool heedMutex = true ) { + if ( heedMutex && dbMutex.isWriteLocked() ) + return; + if( _globalKill ) + uasserted(11600,"interrupted at shutdown"); + if( cc().curop()->killed() ) + uasserted(11601,"interrupted"); } + + /** @return "" if not interrupted. otherwise, you should stop. */ + const char *checkForInterruptNoAssert( bool heedMutex = true ) { + if ( heedMutex && dbMutex.isWriteLocked() ) + return ""; + if( _globalKill ) + return "interrupted at shutdown"; + if( cc().curop()->killed() ) + return "interrupted"; + return ""; + } + + private: + void interruptJs( AtomicUInt *op ); + volatile bool _globalKill; } killCurrentOp; + } |