summaryrefslogtreecommitdiff
path: root/db/curop.h
diff options
context:
space:
mode:
Diffstat (limited to 'db/curop.h')
-rw-r--r--db/curop.h358
1 files changed, 208 insertions, 150 deletions
diff --git a/db/curop.h b/db/curop.h
index bf06a69..c6e949b 100644
--- a/db/curop.h
+++ b/db/curop.h
@@ -1,4 +1,5 @@
-// curop.h
+// @file curop.h
+
/*
* Copyright (C) 2010 10gen Inc.
*
@@ -18,152 +19,188 @@
#pragma once
-#include "namespace.h"
+#include "namespace-inl.h"
#include "client.h"
#include "../bson/util/atomic_int.h"
+#include "../util/concurrency/spin_lock.h"
+#include "../util/time_support.h"
#include "db.h"
+#include "../scripting/engine.h"
-namespace mongo {
+namespace mongo {
/* lifespan is different than CurOp because of recursives with DBDirectClient */
class OpDebug {
public:
StringBuilder str;
-
- void reset(){
- str.reset();
- }
+ void reset() { str.reset(); }
};
-
- /* Current operation (for the current Client).
- an embedded member of Client class, and typically used from within the mutex there. */
- class CurOp : boost::noncopyable {
- static AtomicUInt _nextOpNum;
+
+ /**
+ * stores a copy of a bson obj in a fixed size buffer
+ * if its too big for the buffer, says "too big"
+ * useful for keeping a copy around indefinitely without wasting a lot of space or doing malloc
+ */
+ class CachedBSONObj {
+ public:
+ enum { TOO_BIG_SENTINEL = 1 } ;
static BSONObj _tooBig; // { $msg : "query not recording (too large)" }
-
- Client * _client;
- CurOp * _wrapped;
- unsigned long long _start;
- unsigned long long _checkpoint;
- unsigned long long _end;
+ CachedBSONObj() {
+ _size = (int*)_buf;
+ reset();
+ }
- bool _active;
- int _op;
- bool _command;
- int _lockType; // see concurrency.h for values
- bool _waitingForLock;
- int _dbprofile; // 0=off, 1=slow, 2=all
- AtomicUInt _opNum;
- char _ns[Namespace::MaxNsLen+2];
- struct SockAddr _remote;
- char _queryBuf[256];
-
- void resetQuery(int x=0) { *((int *)_queryBuf) = x; }
-
- OpDebug _debug;
-
- ThreadSafeString _message;
- ProgressMeter _progressMeter;
+ void reset( int sz = 0 ) {
+ _lock.lock();
+ _reset( sz );
+ _lock.unlock();
+ }
+
+ void set( const BSONObj& o ) {
+ _lock.lock();
+ try {
+ int sz = o.objsize();
+
+ if ( sz > (int) sizeof(_buf) ) {
+ _reset(TOO_BIG_SENTINEL);
+ }
+ else {
+ memcpy(_buf, o.objdata(), sz );
+ }
+
+ _lock.unlock();
+ }
+ catch ( ... ) {
+ _lock.unlock();
+ throw;
+ }
- void _reset(){
- _command = false;
- _lockType = 0;
- _dbprofile = 0;
- _end = 0;
- _waitingForLock = false;
- _message = "";
- _progressMeter.finished();
}
- void setNS(const char *ns) {
- strncpy(_ns, ns, Namespace::MaxNsLen);
+ int size() const { return *_size; }
+ bool have() const { return size() > 0; }
+
+ BSONObj get() {
+ _lock.lock();
+ BSONObj o;
+ try {
+ o = _get();
+ _lock.unlock();
+ }
+ catch ( ... ) {
+ _lock.unlock();
+ throw;
+ }
+ return o;
+ }
+
+ void append( BSONObjBuilder& b , const StringData& name ) {
+ _lock.lock();
+ try {
+ BSONObj temp = _get();
+ b.append( name , temp );
+ _lock.unlock();
+ }
+ catch ( ... ) {
+ _lock.unlock();
+ throw;
+ }
}
+ private:
+ /** you have to be locked when you call this */
+ BSONObj _get() {
+ int sz = size();
+ if ( sz == 0 )
+ return BSONObj();
+ if ( sz == TOO_BIG_SENTINEL )
+ return _tooBig;
+ return BSONObj( _buf ).copy();
+ }
+
+ /** you have to be locked when you call this */
+ void _reset( int sz ) { _size[0] = sz; }
+
+ SpinLock _lock;
+ int * _size;
+ char _buf[512];
+ };
+
+ /* Current operation (for the current Client).
+ an embedded member of Client class, and typically used from within the mutex there.
+ */
+ class CurOp : boost::noncopyable {
public:
-
- int querySize() const { return *((int *) _queryBuf); }
- bool haveQuery() const { return querySize() != 0; }
+ CurOp( Client * client , CurOp * wrapped = 0 );
+ ~CurOp();
- BSONObj query( bool threadSafe = false);
+ bool haveQuery() const { return _query.have(); }
+ BSONObj query() { return _query.get(); }
- void ensureStarted(){
+ void ensureStarted() {
if ( _start == 0 )
- _start = _checkpoint = curTimeMicros64();
+ _start = _checkpoint = curTimeMicros64();
}
- void enter( Client::Context * context ){
+ void enter( Client::Context * context ) {
ensureStarted();
setNS( context->ns() );
if ( context->_db && context->_db->profile > _dbprofile )
_dbprofile = context->_db->profile;
}
- void leave( Client::Context * context ){
+ void leave( Client::Context * context ) {
unsigned long long now = curTimeMicros64();
Top::global.record( _ns , _op , _lockType , now - _checkpoint , _command );
_checkpoint = now;
}
- void reset(){
+ void reset() {
_reset();
_start = _checkpoint = 0;
_active = true;
_opNum = _nextOpNum++;
_ns[0] = '?'; // just in case not set later
_debug.reset();
- resetQuery();
+ _query.reset();
}
-
+
void reset( const SockAddr & remote, int op ) {
reset();
_remote = remote;
_op = op;
}
-
- void markCommand(){
- _command = true;
- }
- void waitingForLock( int type ){
+ void markCommand() { _command = true; }
+
+ void waitingForLock( int type ) {
_waitingForLock = true;
if ( type > 0 )
_lockType = 1;
else
_lockType = -1;
}
- void gotLock(){
- _waitingForLock = false;
- }
-
- OpDebug& debug(){
- return _debug;
- }
-
- int profileLevel() const {
- return _dbprofile;
- }
-
- const char * getNS() const {
- return _ns;
- }
+ void gotLock() { _waitingForLock = false; }
+ OpDebug& debug() { return _debug; }
+ int profileLevel() const { return _dbprofile; }
+ const char * getNS() const { return _ns; }
bool shouldDBProfile( int ms ) const {
if ( _dbprofile <= 0 )
return false;
-
+
return _dbprofile >= 2 || ms >= cmdLine.slowMS;
}
-
+
AtomicUInt opNum() const { return _opNum; }
/** if this op is running */
bool active() const { return _active; }
-
+
int getLockType() const { return _lockType; }
- bool isWaitingForLock() const { return _waitingForLock; }
+ bool isWaitingForLock() const { return _waitingForLock; }
int getOp() const { return _op; }
-
-
+
/** micros */
unsigned long long startTime() {
ensureStarted();
@@ -174,75 +211,41 @@ namespace mongo {
_active = false;
_end = curTimeMicros64();
}
-
+
unsigned long long totalTimeMicros() {
massert( 12601 , "CurOp not marked done yet" , ! _active );
return _end - startTime();
}
- int totalTimeMillis() {
- return (int) (totalTimeMicros() / 1000);
- }
+ int totalTimeMillis() { return (int) (totalTimeMicros() / 1000); }
int elapsedMillis() {
unsigned long long total = curTimeMicros64() - startTime();
return (int) (total / 1000);
}
- int elapsedSeconds() {
- return elapsedMillis() / 1000;
- }
+ int elapsedSeconds() { return elapsedMillis() / 1000; }
- void setQuery(const BSONObj& query) {
- if( query.objsize() > (int) sizeof(_queryBuf) ) {
- resetQuery(1); // flag as too big and return
- return;
- }
- memcpy(_queryBuf, query.objdata(), query.objsize());
- }
+ void setQuery(const BSONObj& query) { _query.set( query ); }
- Client * getClient() const {
- return _client;
- }
+ Client * getClient() const { return _client; }
- CurOp( Client * client , CurOp * wrapped = 0 ) {
- _client = client;
- _wrapped = wrapped;
- if ( _wrapped ){
- _client->_curOp = this;
- }
- _start = _checkpoint = 0;
- _active = false;
- _reset();
- _op = 0;
- // These addresses should never be written to again. The zeroes are
- // placed here as a precaution because currentOp may be accessed
- // without the db mutex.
- memset(_ns, 0, sizeof(_ns));
- memset(_queryBuf, 0, sizeof(_queryBuf));
- }
-
- ~CurOp();
-
- BSONObj info() {
- if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) {
+ BSONObj info() {
+ if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) {
BSONObjBuilder b;
b.append("err", "unauthorized");
return b.obj();
}
return infoNoauth();
}
-
- BSONObj infoNoauth( int attempt = 0 );
- string getRemoteString( bool includePort = true ){
- return _remote.toString(includePort);
- }
+ BSONObj infoNoauth();
- ProgressMeter& setMessage( const char * msg , long long progressMeterTotal = 0 , int secondsBetween = 3 ){
+ string getRemoteString( bool includePort = true ) { return _remote.toString(includePort); }
- if ( progressMeterTotal ){
- if ( _progressMeter.isActive() ){
+ ProgressMeter& setMessage( const char * msg , unsigned long long progressMeterTotal = 0 , int secondsBetween = 3 ) {
+ if ( progressMeterTotal ) {
+ if ( _progressMeter.isActive() ) {
cout << "about to assert, old _message: " << _message << " new message:" << msg << endl;
assert( ! _progressMeter.isActive() );
}
@@ -251,38 +254,93 @@ namespace mongo {
else {
_progressMeter.finished();
}
-
+
_message = msg;
-
+
return _progressMeter;
}
-
+
string getMessage() const { return _message.toString(); }
ProgressMeter& getProgressMeter() { return _progressMeter; }
-
+ CurOp *parent() const { return _wrapped; }
+ void kill() { _killed = true; }
+ bool killed() const { return _killed; }
+ void setNS(const char *ns) {
+ strncpy(_ns, ns, Namespace::MaxNsLen);
+ _ns[Namespace::MaxNsLen] = 0;
+ }
friend class Client;
+
+ private:
+ static AtomicUInt _nextOpNum;
+ Client * _client;
+ CurOp * _wrapped;
+ unsigned long long _start;
+ unsigned long long _checkpoint;
+ unsigned long long _end;
+ bool _active;
+ int _op;
+ bool _command;
+ int _lockType; // see concurrency.h for values
+ bool _waitingForLock;
+ int _dbprofile; // 0=off, 1=slow, 2=all
+ AtomicUInt _opNum;
+ char _ns[Namespace::MaxNsLen+2];
+ struct SockAddr _remote;
+ CachedBSONObj _query;
+ OpDebug _debug;
+ ThreadSafeString _message;
+ ProgressMeter _progressMeter;
+ volatile bool _killed;
+
+ void _reset() {
+ _command = false;
+ _lockType = 0;
+ _dbprofile = 0;
+ _end = 0;
+ _waitingForLock = false;
+ _message = "";
+ _progressMeter.finished();
+ _killed = false;
+ }
};
- /* 0 = ok
- 1 = kill current operation and reset this to 0
- future: maybe use this as a "going away" thing on process termination with a higher flag value
+ /* _globalKill: we are shutting down
+ otherwise kill attribute set on specified CurOp
+ this class does not handle races between interruptJs and the checkForInterrupt functions - those must be
+ handled by the client of this class
*/
- extern class KillCurrentOp {
- enum { Off, On, All } state;
- AtomicUInt toKill;
+ extern class KillCurrentOp {
public:
- void killAll() { state = All; }
- void kill(AtomicUInt i) { toKill = i; state = On; }
-
- void checkForInterrupt() {
- if( state != Off ) {
- if( state == All )
- uasserted(11600,"interrupted at shutdown");
- if( cc().curop()->opNum() == toKill ) {
- state = Off;
- uasserted(11601,"interrupted");
- }
- }
+ void killAll();
+ void kill(AtomicUInt i);
+
+ /** @return true if global interrupt and should terminate the operation */
+ bool globalInterruptCheck() const { return _globalKill; }
+
+ void checkForInterrupt( bool heedMutex = true ) {
+ if ( heedMutex && dbMutex.isWriteLocked() )
+ return;
+ if( _globalKill )
+ uasserted(11600,"interrupted at shutdown");
+ if( cc().curop()->killed() )
+ uasserted(11601,"interrupted");
}
+
+ /** @return "" if not interrupted. otherwise, you should stop. */
+ const char *checkForInterruptNoAssert( bool heedMutex = true ) {
+ if ( heedMutex && dbMutex.isWriteLocked() )
+ return "";
+ if( _globalKill )
+ return "interrupted at shutdown";
+ if( cc().curop()->killed() )
+ return "interrupted";
+ return "";
+ }
+
+ private:
+ void interruptJs( AtomicUInt *op );
+ volatile bool _globalKill;
} killCurrentOp;
+
}