summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
Diffstat (limited to 'util')
-rw-r--r--util/allocator.h12
-rw-r--r--util/array.h104
-rw-r--r--util/assert_util.cpp46
-rw-r--r--util/assert_util.h37
-rw-r--r--util/atomic_int.h100
-rw-r--r--util/background.cpp4
-rw-r--r--util/background.h3
-rw-r--r--util/base64.cpp39
-rw-r--r--util/base64.h37
-rw-r--r--util/builder.h8
-rw-r--r--util/debug_util.cpp2
-rw-r--r--util/file_allocator.h39
-rw-r--r--util/goodies.h248
-rw-r--r--util/hashtab.h2
-rw-r--r--util/hex.h35
-rw-r--r--util/httpclient.cpp87
-rw-r--r--util/httpclient.h28
-rw-r--r--util/log.h7
-rw-r--r--util/message.cpp42
-rw-r--r--util/message.h34
-rw-r--r--util/message_server_asio.cpp82
-rw-r--r--util/message_server_port.cpp2
-rw-r--r--util/miniwebserver.cpp31
-rw-r--r--util/miniwebserver.h7
-rw-r--r--util/mmap.cpp25
-rw-r--r--util/mmap.h9
-rw-r--r--util/mmap_mm.cpp7
-rw-r--r--util/mmap_posix.cpp14
-rw-r--r--util/mmap_win.cpp26
-rw-r--r--util/optime.h23
-rw-r--r--util/processinfo.h3
-rw-r--r--util/processinfo_darwin.cpp26
-rw-r--r--util/processinfo_linux2.cpp21
-rw-r--r--util/processinfo_none.cpp9
-rw-r--r--util/processinfo_win32.cpp10
-rw-r--r--util/queue.h12
-rw-r--r--util/sock.cpp4
-rw-r--r--util/sock.h10
-rw-r--r--util/thread_pool.cpp10
-rw-r--r--util/thread_pool.h2
-rw-r--r--util/top.cpp18
-rw-r--r--util/top.h183
-rw-r--r--util/util.cpp18
43 files changed, 1054 insertions, 412 deletions
diff --git a/util/allocator.h b/util/allocator.h
index af8032c..90dbf24 100644
--- a/util/allocator.h
+++ b/util/allocator.h
@@ -34,16 +34,4 @@ namespace mongo {
#define malloc mongo::ourmalloc
#define realloc mongo::ourrealloc
-#if defined(_WIN32)
- inline void our_debug_free(void *p) {
-#if 0
- // this is not safe if you malloc < 4 bytes so we don't use anymore
- unsigned *u = (unsigned *) p;
- u[0] = 0xEEEEEEEE;
-#endif
- free(p);
- }
-#define free our_debug_free
-#endif
-
} // namespace mongo
diff --git a/util/array.h b/util/array.h
new file mode 100644
index 0000000..827c00e
--- /dev/null
+++ b/util/array.h
@@ -0,0 +1,104 @@
+// array.h
+
+namespace mongo {
+
+ template<typename T>
+ class FastArray {
+ public:
+ FastArray( int capacity=10000 )
+ : _capacity( capacity ) , _size(0) , _end(this,capacity){
+ _data = new T[capacity];
+ }
+
+ ~FastArray(){
+ delete[] _data;
+ }
+
+ void clear(){
+ _size = 0;
+ }
+
+ T& operator[]( int x ){
+ assert( x >= 0 && x < _capacity );
+ return _data[x];
+ }
+
+ T& getNext(){
+ return _data[_size++];
+ }
+
+ void push_back( const T& t ){
+ _data[_size++] = t;
+ }
+
+ void sort( int (*comp)(const void *, const void *) ){
+ qsort( _data , _size , sizeof(T) , comp );
+ }
+
+ int size(){
+ return _size;
+ }
+
+ bool hasSpace(){
+ return _size < _capacity;
+ }
+ class iterator {
+ public:
+ iterator(){
+ _it = 0;
+ _pos = 0;
+ }
+
+ iterator( FastArray * it , int pos=0 ){
+ _it = it;
+ _pos = pos;
+ }
+
+ bool operator==(const iterator& other ) const {
+ return _pos == other._pos;
+ }
+
+ bool operator!=(const iterator& other ) const {
+ return _pos != other._pos;
+ }
+
+ void operator++(){
+ _pos++;
+ }
+
+ T& operator*(){
+ return _it->_data[_pos];
+ }
+
+ operator string() const {
+ stringstream ss;
+ ss << _pos;
+ return ss.str();
+ }
+ private:
+ FastArray * _it;
+ int _pos;
+
+ friend class FastArray;
+ };
+
+
+ iterator begin(){
+ return iterator(this);
+ }
+
+ iterator end(){
+ _end._pos = _size;
+ return _end;
+ }
+
+
+ private:
+ int _capacity;
+ int _size;
+
+ iterator _end;
+
+ T * _data;
+ };
+}
diff --git a/util/assert_util.cpp b/util/assert_util.cpp
index d1d85b2..8c8477a 100644
--- a/util/assert_util.cpp
+++ b/util/assert_util.cpp
@@ -22,6 +22,26 @@
namespace mongo {
+ AssertionCount assertionCount;
+
+ AssertionCount::AssertionCount()
+ : regular(0),warning(0),msg(0),user(0),rollovers(0){
+ }
+
+ void AssertionCount::rollover(){
+ rollovers++;
+ regular = 0;
+ warning = 0;
+ msg = 0;
+ user = 0;
+ }
+
+ void AssertionCount::condrollover( int newvalue ){
+ static int max = (int)pow( 2.0 , 30 );
+ if ( newvalue >= max )
+ rollover();
+ }
+
string getDbContext();
Assertion lastAssert[4];
@@ -32,9 +52,11 @@ namespace mongo {
sayDbContext();
raiseError(0,msg && *msg ? msg : "wassertion failure");
lastAssert[1].set(msg, getDbContext().c_str(), file, line);
+ assertionCount.condrollover( ++assertionCount.warning );
}
void asserted(const char *msg, const char *file, unsigned line) {
+ assertionCount.condrollover( ++assertionCount.regular );
problem() << "Assertion failure " << msg << ' ' << file << ' ' << dec << line << endl;
sayDbContext();
raiseError(0,msg && *msg ? msg : "assertion failure");
@@ -54,6 +76,7 @@ namespace mongo {
int uacount = 0;
void uasserted(int msgid, const char *msg) {
+ assertionCount.condrollover( ++assertionCount.user );
if ( ++uacount < 100 )
log() << "User Exception " << msgid << ":" << msg << endl;
else
@@ -64,6 +87,7 @@ namespace mongo {
}
void msgasserted(int msgid, const char *msg) {
+ assertionCount.condrollover( ++assertionCount.warning );
log() << "Assertion: " << msgid << ":" << msg << endl;
lastAssert[2].set(msg, getDbContext().c_str(), "", 0);
raiseError(msgid,msg && *msg ? msg : "massert failure");
@@ -72,13 +96,22 @@ namespace mongo {
throw MsgAssertionException(msgid, msg);
}
- boost::mutex *Assertion::_mutex = new boost::mutex();
+ void streamNotGood( int code , string msg , std::ios& myios ){
+ stringstream ss;
+ // errno might not work on all systems for streams
+ // if it doesn't for a system should deal with here
+ ss << msg << " stream invalie: " << OUTPUT_ERRNO;
+ throw UserException( code , ss.str() );
+ }
+
+
+ mongo::mutex *Assertion::_mutex = new mongo::mutex();
string Assertion::toString() {
if( _mutex == 0 )
return "";
- boostlock lk(*_mutex);
+ scoped_lock lk(*_mutex);
if ( !isSet() )
return "";
@@ -166,5 +199,14 @@ namespace mongo {
void rotateLogs( int signal ){
loggingManager.rotate();
}
+
+ string errnostring( const char * prefix ){
+ stringstream ss;
+ if ( prefix )
+ ss << prefix << ": ";
+ ss << OUTPUT_ERRNO;
+ return ss.str();
+ }
+
}
diff --git a/util/assert_util.h b/util/assert_util.h
index ccb60a0..bae3a55 100644
--- a/util/assert_util.h
+++ b/util/assert_util.h
@@ -32,7 +32,7 @@ namespace mongo {
when = 0;
}
private:
- static boost::mutex *_mutex;
+ static mongo::mutex *_mutex;
char msg[128];
char context[128];
const char *file;
@@ -44,7 +44,7 @@ namespace mongo {
/* asserted during global variable initialization */
return;
}
- boostlock lk(*_mutex);
+ scoped_lock lk(*_mutex);
strncpy(msg, m, 127);
strncpy(context, ctxt, 127);
file = f;
@@ -67,6 +67,21 @@ namespace mongo {
/* last assert of diff types: regular, wassert, msgassert, uassert: */
extern Assertion lastAssert[4];
+ class AssertionCount {
+ public:
+ AssertionCount();
+ void rollover();
+ void condrollover( int newValue );
+
+ int regular;
+ int warning;
+ int msg;
+ int user;
+ int rollovers;
+ };
+
+ extern AssertionCount assertionCount;
+
class DBException : public std::exception {
public:
virtual const char* what() const throw() = 0;
@@ -91,6 +106,11 @@ namespace mongo {
}
virtual int getCode(){ return code; }
virtual const char* what() const throw() { return msg.c_str(); }
+
+ /* true if an interrupted exception - see KillCurrentOp */
+ bool interrupted() {
+ return code == 11600 || code == 11601;
+ }
};
/* UserExceptions are valid errors that a user can cause, like out of disk space or duplicate key */
@@ -173,6 +193,10 @@ namespace mongo {
#define ASSERT_ID_DUPKEY 11000
+ void streamNotGood( int code , string msg , std::ios& myios );
+
+#define ASSERT_STREAM_GOOD(msgid,msg,stream) (void)( (!!((stream).good())) || (mongo::streamNotGood(msgid, msg, stream), 0) )
+
} // namespace mongo
#define BOOST_CHECK_EXCEPTION( expression ) \
@@ -184,3 +208,12 @@ namespace mongo {
} catch ( ... ) { \
massert( 10437 , "unknown boost failed" , false ); \
}
+
+#define DESTRUCTOR_GUARD( expression ) \
+ try { \
+ expression; \
+ } catch ( const std::exception &e ) { \
+ problem() << "caught exception (" << e.what() << ") in destructor (" << __FUNCTION__ << ")" << endl; \
+ } catch ( ... ) { \
+ problem() << "caught unknown exception in destructor (" << __FUNCTION__ << ")" << endl; \
+ }
diff --git a/util/atomic_int.h b/util/atomic_int.h
new file mode 100644
index 0000000..de50560
--- /dev/null
+++ b/util/atomic_int.h
@@ -0,0 +1,100 @@
+// atomic_int.h
+// atomic wrapper for unsigned
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#if defined(_WIN32)
+# include <windows.h>
+#endif
+
+namespace mongo{
+
+
+ struct AtomicUInt{
+ AtomicUInt() : x(0) {}
+ AtomicUInt(unsigned z) : x(z) { }
+ volatile unsigned x;
+ operator unsigned() const {
+ return x;
+ }
+ inline AtomicUInt operator++(); // ++prefix
+ inline AtomicUInt operator++(int);// postfix++
+ inline AtomicUInt operator--(); // --prefix
+ inline AtomicUInt operator--(int); // postfix--
+ };
+
+#if defined(_WIN32)
+ AtomicUInt AtomicUInt::operator++(){
+ // InterlockedIncrement returns the new value
+ return InterlockedIncrement((volatile long*)&x); //long is 32bits in Win64
+ }
+ AtomicUInt AtomicUInt::operator++(int){
+ return InterlockedIncrement((volatile long*)&x)-1;
+ }
+ AtomicUInt AtomicUInt::operator--(){
+ return InterlockedDecrement((volatile long*)&x);
+ }
+ AtomicUInt AtomicUInt::operator--(int){
+ return InterlockedDecrement((volatile long*)&x)+1;
+ }
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ // this is in GCC >= 4.1
+ AtomicUInt AtomicUInt::operator++(){
+ return __sync_add_and_fetch(&x, 1);
+ }
+ AtomicUInt AtomicUInt::operator++(int){
+ return __sync_fetch_and_add(&x, 1);
+ }
+ AtomicUInt AtomicUInt::operator--(){
+ return __sync_add_and_fetch(&x, -1);
+ }
+ AtomicUInt AtomicUInt::operator--(int){
+ return __sync_fetch_and_add(&x, -1);
+ }
+#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))
+ // from boost 1.39 interprocess/detail/atomic.hpp
+
+ inline unsigned atomic_int_helper(volatile unsigned *x, int val){
+ int r;
+ asm volatile
+ (
+ "lock\n\t"
+ "xadd %1, %0":
+ "+m"( *x ), "=r"( r ): // outputs (%0, %1)
+ "1"( val ): // inputs (%2 == %1)
+ "memory", "cc" // clobbers
+ );
+ return r;
+ }
+ AtomicUInt AtomicUInt::operator++(){
+ return atomic_int_helper(&x, 1)+1;
+ }
+ AtomicUInt AtomicUInt::operator++(int){
+ return atomic_int_helper(&x, 1);
+ }
+ AtomicUInt AtomicUInt::operator--(){
+ return atomic_int_helper(&x, -1)-1;
+ }
+ AtomicUInt AtomicUInt::operator--(int){
+ return atomic_int_helper(&x, -1);
+ }
+#else
+# error "unsupported compiler or platform"
+#endif
+
+} // namespace mongo
diff --git a/util/background.cpp b/util/background.cpp
index ac3a48c..4125315 100644
--- a/util/background.cpp
+++ b/util/background.cpp
@@ -22,7 +22,7 @@
namespace mongo {
BackgroundJob *BackgroundJob::grab = 0;
- boost::mutex &BackgroundJob::mutex = *( new boost::mutex );
+ mongo::mutex BackgroundJob::mutex;
/* static */
void BackgroundJob::thr() {
@@ -38,7 +38,7 @@ namespace mongo {
}
BackgroundJob& BackgroundJob::go() {
- boostlock bl(mutex);
+ scoped_lock bl(mutex);
assert( grab == 0 );
grab = this;
boost::thread t(thr);
diff --git a/util/background.h b/util/background.h
index ff044cb..c95a5bd 100644
--- a/util/background.h
+++ b/util/background.h
@@ -27,7 +27,6 @@ namespace mongo {
has finished. Thus one pattern of use is to embed a backgroundjob
in your object and reuse it (or same thing with inheritance).
*/
-
class BackgroundJob {
protected:
/* define this to do your work! */
@@ -65,7 +64,7 @@ namespace mongo {
private:
static BackgroundJob *grab;
- static boost::mutex &mutex;
+ static mongo::mutex mutex;
static void thr();
volatile State state;
};
diff --git a/util/base64.cpp b/util/base64.cpp
index cf2f485..8d9d544 100644
--- a/util/base64.cpp
+++ b/util/base64.cpp
@@ -17,48 +17,13 @@
*/
#include "stdafx.h"
+#include "base64.h"
namespace mongo {
namespace base64 {
- class Alphabet {
- public:
- Alphabet(){
- encode = (unsigned char*)
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz"
- "0123456789"
- "+/";
-
- decode = (unsigned char*)malloc(257);
- memset( decode , 0 , 256 );
- for ( int i=0; i<64; i++ ){
- decode[ encode[i] ] = i;
- }
-
- test();
- }
- ~Alphabet(){
- free( decode );
- }
+ Alphabet alphabet;
- void test(){
- assert( strlen( (char*)encode ) == 64 );
- for ( int i=0; i<26; i++ )
- assert( encode[i] == toupper( encode[i+26] ) );
- }
-
- char e( int x ){
- return encode[x&0x3f];
- }
-
- private:
- const unsigned char * encode;
- public:
- unsigned char * decode;
- } alphabet;
-
-
void encode( stringstream& ss , const char * data , int size ){
for ( int i=0; i<size; i+=3 ){
int left = size - i;
diff --git a/util/base64.h b/util/base64.h
index 62caceb..c113eed 100644
--- a/util/base64.h
+++ b/util/base64.h
@@ -15,10 +15,47 @@
* limitations under the License.
*/
+#pragma once
namespace mongo {
namespace base64 {
+ class Alphabet {
+ public:
+ Alphabet()
+ : encode((unsigned char*)
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz"
+ "0123456789"
+ "+/")
+ , decode(new unsigned char[257])
+ {
+ memset( decode.get() , 0 , 256 );
+ for ( int i=0; i<64; i++ ){
+ decode[ encode[i] ] = i;
+ }
+
+ test();
+ }
+ void test(){
+ assert( strlen( (char*)encode ) == 64 );
+ for ( int i=0; i<26; i++ )
+ assert( encode[i] == toupper( encode[i+26] ) );
+ }
+
+ char e( int x ){
+ return encode[x&0x3f];
+ }
+
+ private:
+ const unsigned char * encode;
+ public:
+ boost::scoped_array<unsigned char> decode;
+ };
+
+ extern Alphabet alphabet;
+
+
void encode( stringstream& ss , const char * data , int size );
string encode( const char * data , int size );
string encode( const string& s );
diff --git a/util/builder.h b/util/builder.h
index 5046b72..f9d3514 100644
--- a/util/builder.h
+++ b/util/builder.h
@@ -90,7 +90,7 @@ namespace mongo {
append<double>(j);
}
- void append(const void *src, int len) {
+ void append(const void *src, size_t len) {
memcpy(grow(len), src, len);
}
@@ -102,6 +102,10 @@ namespace mongo {
append( (void *)str.c_str(), str.length() + 1 );
}
+ void append( int val , int padding ){
+
+ }
+
int len() const {
return l;
}
@@ -197,7 +201,7 @@ namespace mongo {
}
string str(){
- return string(_buf.data,0,_buf.l);
+ return string(_buf.data, _buf.l);
}
private:
diff --git a/util/debug_util.cpp b/util/debug_util.cpp
index 283053f..9c2f5dc 100644
--- a/util/debug_util.cpp
+++ b/util/debug_util.cpp
@@ -21,7 +21,7 @@
namespace mongo {
-#if defined(_DEBUG) && !defined(_WIN32)
+#if defined(USE_GDBSERVER)
/* Magic gdb trampoline
* Do not call directly! call setupSIGTRAPforGDB()
* Assumptions:
diff --git a/util/file_allocator.h b/util/file_allocator.h
index 73159d3..93b2b1c 100644
--- a/util/file_allocator.h
+++ b/util/file_allocator.h
@@ -54,7 +54,7 @@ namespace mongo {
on windows anyway as we don't have to pre-zero the file there.
*/
#if !defined(_WIN32)
- boostlock lk( pendingMutex_ );
+ scoped_lock lk( pendingMutex_ );
if ( failed_ )
return;
long oldSize = prevSize( name );
@@ -71,7 +71,7 @@ namespace mongo {
// updated to match existing file size.
void allocateAsap( const string &name, long &size ) {
#if !defined(_WIN32)
- boostlock lk( pendingMutex_ );
+ scoped_lock lk( pendingMutex_ );
long oldSize = prevSize( name );
if ( oldSize != -1 ) {
size = oldSize;
@@ -91,7 +91,7 @@ namespace mongo {
pendingUpdated_.notify_all();
while( inProgress( name ) ) {
checkFailure();
- pendingUpdated_.wait( lk );
+ pendingUpdated_.wait( lk.boost() );
}
#endif
}
@@ -100,9 +100,9 @@ namespace mongo {
#if !defined(_WIN32)
if ( failed_ )
return;
- boostlock lk( pendingMutex_ );
+ scoped_lock lk( pendingMutex_ );
while( pending_.size() != 0 )
- pendingUpdated_.wait( lk );
+ pendingUpdated_.wait( lk.boost() );
#endif
}
@@ -130,7 +130,7 @@ namespace mongo {
return false;
}
- mutable boost::mutex pendingMutex_;
+ mutable mongo::mutex pendingMutex_;
mutable boost::condition pendingUpdated_;
list< string > pending_;
mutable map< string, long > pendingSize_;
@@ -142,21 +142,22 @@ namespace mongo {
void operator()() {
while( 1 ) {
{
- boostlock lk( a_.pendingMutex_ );
+ scoped_lock lk( a_.pendingMutex_ );
if ( a_.pending_.size() == 0 )
- a_.pendingUpdated_.wait( lk );
+ a_.pendingUpdated_.wait( lk.boost() );
}
while( 1 ) {
string name;
long size;
{
- boostlock lk( a_.pendingMutex_ );
+ scoped_lock lk( a_.pendingMutex_ );
if ( a_.pending_.size() == 0 )
break;
name = a_.pending_.front();
size = a_.pendingSize_[ name ];
}
try {
+ log() << "allocating new datafile " << name << ", filling with zeroes..." << endl;
long fd = open(name.c_str(), O_CREAT | O_RDWR | O_NOATIME, S_IRUSR | S_IWUSR);
if ( fd <= 0 ) {
stringstream ss;
@@ -180,19 +181,19 @@ namespace mongo {
massert( 10442 , "Unable to allocate file of desired size",
1 == write(fd, "", 1) );
lseek(fd, 0, SEEK_SET);
- log() << "allocating new datafile " << name << ", filling with zeroes..." << endl;
Timer t;
long z = 256 * 1024;
char buf[z];
memset(buf, 0, z);
long left = size;
- while ( 1 ) {
- if ( left <= z ) {
- massert( 10443 , "write failed", left == write(fd, buf, left) );
- break;
- }
- massert( 10444 , "write failed", z == write(fd, buf, z) );
- left -= z;
+ while ( left > 0 ) {
+ long towrite = left;
+ if ( towrite > z )
+ towrite = z;
+
+ int written = write( fd , buf , towrite );
+ massert( 10443 , errnostring("write failed" ), written > 0 );
+ left -= written;
}
log() << "done allocating datafile " << name << ", size: " << size/1024/1024 << "MB, took " << ((double)t.millis())/1000.0 << " secs" << endl;
}
@@ -205,7 +206,7 @@ namespace mongo {
BOOST_CHECK_EXCEPTION( boost::filesystem::remove( name ) );
} catch ( ... ) {
}
- boostlock lk( a_.pendingMutex_ );
+ scoped_lock lk( a_.pendingMutex_ );
a_.failed_ = true;
// not erasing from pending
a_.pendingUpdated_.notify_all();
@@ -213,7 +214,7 @@ namespace mongo {
}
{
- boostlock lk( a_.pendingMutex_ );
+ scoped_lock lk( a_.pendingMutex_ );
a_.pendingSize_.erase( name );
a_.pending_.pop_front();
a_.pendingUpdated_.notify_all();
diff --git a/util/goodies.h b/util/goodies.h
index 7eebc0a..4641941 100644
--- a/util/goodies.h
+++ b/util/goodies.h
@@ -24,7 +24,7 @@
namespace mongo {
-#if !defined(_WIN32) && !defined(NOEXECINFO)
+#if !defined(_WIN32) && !defined(NOEXECINFO) && !defined(__freebsd__) && !defined(__sun__)
} // namespace mongo
@@ -120,36 +120,11 @@ namespace mongo {
x = 0;
}
WrappingInt(unsigned z) : x(z) { }
- volatile unsigned x;
+ unsigned x;
operator unsigned() const {
return x;
}
- // returns original value (like x++)
- WrappingInt atomicIncrement(){
-#if defined(_WIN32)
- // InterlockedIncrement returns the new value
- return InterlockedIncrement((volatile long*)&x)-1; //long is 32bits in Win64
-#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
- // this is in GCC >= 4.1
- return __sync_fetch_and_add(&x, 1);
-#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))
- // from boost 1.39 interprocess/detail/atomic.hpp
- int r;
- int val = 1;
- asm volatile
- (
- "lock\n\t"
- "xadd %1, %0":
- "+m"( x ), "=r"( r ): // outputs (%0, %1)
- "1"( val ): // inputs (%2 == %1)
- "memory", "cc" // clobbers
- );
- return r;
-#else
-# error "unsupported compiler or platform"
-#endif
- }
static int diff(unsigned a, unsigned b) {
return a-b;
@@ -179,6 +154,23 @@ namespace mongo {
buf[24] = 0; // don't want the \n
}
+
+ inline void time_t_to_Struct(time_t t, struct tm * buf , bool local = false ) {
+#if defined(_WIN32)
+ if ( local )
+ localtime_s( buf , &t );
+ else
+ gmtime_s(buf, &t);
+#else
+ if ( local )
+ localtime_r(&t, buf);
+ else
+ gmtime_r(&t, buf);
+#endif
+ }
+
+
+
#define asctime _asctime_not_threadsafe_
#define gmtime _gmtime_not_threadsafe_
#define localtime _localtime_not_threadsafe_
@@ -278,8 +270,42 @@ namespace mongo {
return secs*1000000 + t;
}
using namespace boost;
- typedef boost::mutex::scoped_lock boostlock;
- typedef boost::recursive_mutex::scoped_lock recursive_boostlock;
+
+ extern bool __destroyingStatics;
+
+ // If you create a local static instance of this class, that instance will be destroyed
+ // before all global static objects are destroyed, so __destroyingStatics will be set
+ // to true before the global static variables are destroyed.
+ class StaticObserver : boost::noncopyable {
+ public:
+ ~StaticObserver() { __destroyingStatics = true; }
+ };
+
+ // On pthread systems, it is an error to destroy a mutex while held. Static global
+ // mutexes may be held upon shutdown in our implementation, and this way we avoid
+ // destroying them.
+ class mutex : boost::noncopyable {
+ public:
+ mutex() { new (_buf) boost::mutex(); }
+ ~mutex() {
+ if( !__destroyingStatics ) {
+ boost().boost::mutex::~mutex();
+ }
+ }
+ class scoped_lock : boost::noncopyable {
+ public:
+ scoped_lock( mongo::mutex &m ) : _l( m.boost() ) {}
+ boost::mutex::scoped_lock &boost() { return _l; }
+ private:
+ boost::mutex::scoped_lock _l;
+ };
+ private:
+ boost::mutex &boost() { return *( boost::mutex * )( _buf ); }
+ char _buf[ sizeof( boost::mutex ) ];
+ };
+
+ typedef mongo::mutex::scoped_lock scoped_lock;
+ typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock;
// simple scoped timer
class Timer {
@@ -318,7 +344,7 @@ namespace mongo {
class DebugMutex : boost::noncopyable {
friend class lock;
- boost::mutex m;
+ mongo::mutex m;
int locked;
public:
DebugMutex() : locked(0); { }
@@ -327,17 +353,17 @@ namespace mongo {
*/
-//typedef boostlock lock;
+//typedef scoped_lock lock;
inline bool startsWith(const char *str, const char *prefix) {
- unsigned l = strlen(prefix);
+ size_t l = strlen(prefix);
if ( strlen(str) < l ) return false;
return strncmp(str, prefix, l) == 0;
}
inline bool endsWith(const char *p, const char *suffix) {
- int a = strlen(p);
- int b = strlen(suffix);
+ size_t a = strlen(p);
+ size_t b = strlen(suffix);
if ( b > a ) return false;
return strcmp(p + a - b, suffix) == 0;
}
@@ -418,12 +444,39 @@ namespace mongo {
class ProgressMeter {
public:
- ProgressMeter( long long total , int secondsBetween = 3 , int checkInterval = 100 )
- : _total( total ) , _secondsBetween( secondsBetween ) , _checkInterval( checkInterval ) ,
- _done(0) , _hits(0) , _lastTime( (int) time(0) ){
+ ProgressMeter( long long total , int secondsBetween = 3 , int checkInterval = 100 ){
+ reset( total , secondsBetween , checkInterval );
+ }
+
+ ProgressMeter(){
+ _active = 0;
+ }
+
+ void reset( long long total , int secondsBetween = 3 , int checkInterval = 100 ){
+ _total = total;
+ _secondsBetween = secondsBetween;
+ _checkInterval = checkInterval;
+
+ _done = 0;
+ _hits = 0;
+ _lastTime = (int)time(0);
+
+ _active = 1;
+ }
+
+ void finished(){
+ _active = 0;
+ }
+
+ bool isActive(){
+ return _active;
}
bool hit( int n = 1 ){
+ if ( ! _active ){
+ cout << "warning: hit on in-active ProgressMeter" << endl;
+ }
+
_done += n;
_hits++;
if ( _hits % _checkInterval )
@@ -449,7 +502,16 @@ namespace mongo {
return _hits;
}
+ string toString() const {
+ if ( ! _active )
+ return "";
+ stringstream buf;
+ buf << _done << "/" << _total << " " << (_done*100)/_total << "%";
+ return buf.str();
+ }
private:
+
+ bool _active;
long long _total;
int _secondsBetween;
@@ -468,7 +530,7 @@ namespace mongo {
}
bool tryAcquire(){
- boostlock lk( _mutex );
+ scoped_lock lk( _mutex );
if ( _num <= 0 ){
if ( _num < 0 ){
cerr << "DISASTER! in TicketHolder" << endl;
@@ -480,12 +542,12 @@ namespace mongo {
}
void release(){
- boostlock lk( _mutex );
+ scoped_lock lk( _mutex );
_num++;
}
void resize( int newSize ){
- boostlock lk( _mutex );
+ scoped_lock lk( _mutex );
int used = _outof - _num;
if ( used > newSize ){
cout << "ERROR: can't resize since we're using (" << used << ") more than newSize(" << newSize << ")" << endl;
@@ -507,7 +569,7 @@ namespace mongo {
private:
int _outof;
int _num;
- boost::mutex _mutex;
+ mongo::mutex _mutex;
};
class TicketHolderReleaser {
@@ -523,4 +585,108 @@ namespace mongo {
TicketHolder * _holder;
};
+
+ /**
+ * this is a thread safe string
+ * you will never get a bad pointer, though data may be mungedd
+ */
+ class ThreadSafeString {
+ public:
+ ThreadSafeString( size_t size=256 )
+ : _size( 256 ) , _buf( new char[256] ){
+ memset( _buf , 0 , _size );
+ }
+
+ ThreadSafeString( const ThreadSafeString& other )
+ : _size( other._size ) , _buf( new char[_size] ){
+ strncpy( _buf , other._buf , _size );
+ }
+
+ ~ThreadSafeString(){
+ delete[] _buf;
+ _buf = 0;
+ }
+
+ operator string() const {
+ string s = _buf;
+ return s;
+ }
+
+ ThreadSafeString& operator=( const char * str ){
+ size_t s = strlen(str);
+ if ( s >= _size - 2 )
+ s = _size - 2;
+ strncpy( _buf , str , s );
+ _buf[s] = 0;
+ return *this;
+ }
+
+ bool operator==( const ThreadSafeString& other ) const {
+ return strcmp( _buf , other._buf ) == 0;
+ }
+
+ bool operator==( const char * str ) const {
+ return strcmp( _buf , str ) == 0;
+ }
+
+ bool operator!=( const char * str ) const {
+ return strcmp( _buf , str );
+ }
+
+ bool empty() const {
+ return _buf[0] == 0;
+ }
+
+ private:
+ size_t _size;
+ char * _buf;
+ };
+
+ ostream& operator<<( ostream &s, const ThreadSafeString &o );
+
+ inline bool isNumber( char c ) {
+ return c >= '0' && c <= '9';
+ }
+
+ // for convenience, '{' is greater than anything and stops number parsing
+ inline int lexNumCmp( const char *s1, const char *s2 ) {
+ int nret = 0;
+ while( *s1 && *s2 ) {
+ bool p1 = ( *s1 == '{' );
+ bool p2 = ( *s2 == '{' );
+ if ( p1 && !p2 )
+ return 1;
+ if ( p2 && !p1 )
+ return -1;
+ bool n1 = isNumber( *s1 );
+ bool n2 = isNumber( *s2 );
+ if ( n1 && n2 ) {
+ if ( nret == 0 ) {
+ nret = *s1 > *s2 ? 1 : ( *s1 == *s2 ? 0 : -1 );
+ }
+ } else if ( n1 ) {
+ return 1;
+ } else if ( n2 ) {
+ return -1;
+ } else {
+ if ( nret ) {
+ return nret;
+ }
+ if ( *s1 > *s2 ) {
+ return 1;
+ } else if ( *s2 > *s1 ) {
+ return -1;
+ }
+ nret = 0;
+ }
+ ++s1; ++s2;
+ }
+ if ( *s1 ) {
+ return 1;
+ } else if ( *s2 ) {
+ return -1;
+ }
+ return nret;
+ }
+
} // namespace mongo
diff --git a/util/hashtab.h b/util/hashtab.h
index 214c0ae..d46591c 100644
--- a/util/hashtab.h
+++ b/util/hashtab.h
@@ -149,7 +149,7 @@ namespace mongo {
typedef void (*IteratorCallback)( const Key& k , Type& v );
- void iterall( IteratorCallback callback ){
+ void iterAll( IteratorCallback callback ){
for ( int i=0; i<n; i++ ){
if ( ! nodes[i].inUse() )
continue;
diff --git a/util/hex.h b/util/hex.h
new file mode 100644
index 0000000..cef3e80
--- /dev/null
+++ b/util/hex.h
@@ -0,0 +1,35 @@
+// util/hex.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace mongo {
+ //can't use hex namespace because it conflicts with hex iostream function
+ inline int fromHex( char c ) {
+ if ( '0' <= c && c <= '9' )
+ return c - '0';
+ if ( 'a' <= c && c <= 'f' )
+ return c - 'a' + 10;
+ if ( 'A' <= c && c <= 'F' )
+ return c - 'A' + 10;
+ assert( false );
+ return 0xff;
+ }
+ inline char fromHex( const char *c ) {
+ return ( fromHex( c[ 0 ] ) << 4 ) | fromHex( c[ 1 ] );
+ }
+}
diff --git a/util/httpclient.cpp b/util/httpclient.cpp
index 284bb63..08b6220 100644
--- a/util/httpclient.cpp
+++ b/util/httpclient.cpp
@@ -17,10 +17,25 @@
#include "stdafx.h"
#include "httpclient.h"
+#include "sock.h"
+#include "message.h"
+#include "builder.h"
namespace mongo {
- int HttpClient::get( string url , map<string,string>& headers, stringstream& data ){
+ //#define HD(x) cout << x << endl;
+#define HD(x)
+
+
+ int HttpClient::get( string url , Result * result ){
+ return _go( "GET" , url , 0 , result );
+ }
+
+ int HttpClient::post( string url , string data , Result * result ){
+ return _go( "POST" , url , data.c_str() , result );
+ }
+
+ int HttpClient::_go( const char * command , string url , const char * body , Result * result ){
uassert( 10271 , "invalid url" , url.find( "http://" ) == 0 );
url = url.substr( 7 );
@@ -34,28 +49,84 @@ namespace mongo {
path = url.substr( url.find( "/" ) );
}
+
+ HD( "host [" << host << "]" );
+ HD( "path [" << path << "]" );
+
+ string server = host;
int port = 80;
- uassert( 10272 , "non standard port not supported yet" , host.find( ":" ) == string::npos );
- cout << "host [" << host << "]" << endl;
- cout << "path [" << path << "]" << endl;
- cout << "port: " << port << endl;
+ string::size_type idx = host.find( ":" );
+ if ( idx != string::npos ){
+ server = host.substr( 0 , idx );
+ string t = host.substr( idx + 1 );
+ port = atoi( t.c_str() );
+ }
+
+ HD( "server [" << server << "]" );
+ HD( "port [" << port << "]" );
string req;
{
stringstream ss;
- ss << "GET " << path << " HTTP/1.1\r\n";
+ ss << command << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "Connection: Close\r\n";
ss << "User-Agent: mongodb http client\r\n";
+ if ( body ) {
+ ss << "Content-Length: " << strlen( body ) << "\r\n";
+ }
ss << "\r\n";
+ if ( body ) {
+ ss << body;
+ }
req = ss.str();
}
+
+ SockAddr addr( server.c_str() , port );
+ HD( "addr: " << addr.toString() );
+
+ MessagingPort p;
+ if ( ! p.connect( addr ) )
+ return -1;
+
+ {
+ const char * out = req.c_str();
+ int toSend = req.size();
+ while ( toSend ){
+ int did = p.send( out , toSend );
+ toSend -= did;
+ out += did;
+ }
+ }
+
+ char buf[4096];
+ int got = p.recv( buf , 4096 );
+ buf[got] = 0;
+
+ int rc;
+ char version[32];
+ assert( sscanf( buf , "%s %d" , version , &rc ) == 2 );
+ HD( "rc: " << rc );
+
+ StringBuilder sb;
+ if ( result )
+ sb << buf;
+
+ while ( ( got = p.recv( buf , 4096 ) ) > 0){
+ if ( result )
+ sb << buf;
+ }
- cout << req << endl;
+ if ( result ){
+ result->_code = rc;
+ result->_entireResponse = sb.str();
+ }
- return -1;
+ return rc;
}
+
+
}
diff --git a/util/httpclient.h b/util/httpclient.h
index 14f0d87..ef3e147 100644
--- a/util/httpclient.h
+++ b/util/httpclient.h
@@ -23,7 +23,33 @@ namespace mongo {
class HttpClient {
public:
- int get( string url , map<string,string>& headers, stringstream& data );
+
+ class Result {
+ public:
+ Result(){}
+
+ const string& getEntireResponse() const {
+ return _entireResponse;
+ }
+ private:
+ int _code;
+ string _entireResponse;
+ friend class HttpClient;
+ };
+
+ /**
+ * @return response code
+ */
+ int get( string url , Result * result = 0 );
+
+ /**
+ * @return response code
+ */
+ int post( string url , string body , Result * result = 0 );
+
+ private:
+ int _go( const char * command , string url , const char * body , Result * result );
+
};
}
diff --git a/util/log.h b/util/log.h
index a9f43c8..668557a 100644
--- a/util/log.h
+++ b/util/log.h
@@ -18,6 +18,7 @@
#pragma once
#include <string.h>
+#include <errno.h>
namespace mongo {
@@ -117,7 +118,7 @@ namespace mongo {
#define LOGIT { ss << x; return *this; }
class Logstream : public Nullstream {
- static boost::mutex &mutex;
+ static mongo::mutex mutex;
static int doneSetup;
stringstream ss;
public:
@@ -127,7 +128,7 @@ namespace mongo {
void flush() {
// this ensures things are sane
if ( doneSetup == 1717 ){
- boostlock lk(mutex);
+ scoped_lock lk(mutex);
cout << ss.str();
cout.flush();
}
@@ -244,4 +245,6 @@ namespace mongo {
#define OUTPUT_ERRNOX(x) "errno:" << x << " " << strerror(x)
#define OUTPUT_ERRNO OUTPUT_ERRNOX(errno)
+ string errnostring( const char * prefix = 0 );
+
} // namespace mongo
diff --git a/util/message.cpp b/util/message.cpp
index 0fbc2d2..2c3d006 100644
--- a/util/message.cpp
+++ b/util/message.cpp
@@ -35,9 +35,11 @@ namespace mongo {
#define mmm(x)
#ifdef MSG_NOSIGNAL
- const int portSendFlags = MSG_NOSIGNAL;
+ const int portSendFlags = MSG_NOSIGNAL;
+ const int portRecvFlags = MSG_NOSIGNAL;
#else
- const int portSendFlags = 0;
+ const int portSendFlags = 0;
+ const int portRecvFlags = 0;
#endif
/* listener ------------------------------------------------------------------- */
@@ -72,7 +74,7 @@ namespace mongo {
void Listener::listen() {
static long connNumber = 0;
SockAddr from;
- while ( 1 ) {
+ while ( ! inShutdown() ) {
int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
if ( s < 0 ) {
if ( errno == ECONNABORTED || errno == EBADF ) {
@@ -117,7 +119,7 @@ namespace mongo {
if ( _buf == _cur )
return 0;
- int x = ::send( _port->sock , _buf , len() , 0 );
+ int x = _port->send( _buf , len() );
_cur = _buf;
return x;
}
@@ -136,23 +138,22 @@ namespace mongo {
class Ports {
set<MessagingPort*>& ports;
- boost::mutex& m;
+ mongo::mutex m;
public:
// we "new" this so it is still be around when other automatic global vars
// are being destructed during termination.
- Ports() : ports( *(new set<MessagingPort*>()) ),
- m( *(new boost::mutex()) ) { }
+ Ports() : ports( *(new set<MessagingPort*>()) ) {}
void closeAll() { \
- boostlock bl(m);
+ scoped_lock bl(m);
for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ )
(*i)->shutdown();
}
void insert(MessagingPort* p) {
- boostlock bl(m);
+ scoped_lock bl(m);
ports.insert(p);
}
void erase(MessagingPort* p) {
- boostlock bl(m);
+ scoped_lock bl(m);
ports.erase(p);
}
} ports;
@@ -263,7 +264,7 @@ again:
char *lenbuf = (char *) &len;
int lft = 4;
while ( 1 ) {
- int x = ::recv(sock, lenbuf, lft, 0);
+ int x = recv( lenbuf, lft );
if ( x == 0 ) {
DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl;
m.reset();
@@ -286,7 +287,7 @@ again:
if ( len == -1 ) {
// Endian check from the database, after connecting, to see what mode server is running in.
unsigned foo = 0x10203040;
- int x = ::send(sock, (char *) &foo, 4, portSendFlags );
+ int x = send( (char *) &foo, 4 );
if ( x <= 0 ) {
log() << "MessagingPort endian send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
return false;
@@ -301,7 +302,7 @@ again:
stringstream ss;
ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
string s = ss.str();
- ::send( sock , s.c_str(), s.size(), 0 );
+ send( s.c_str(), s.size() );
return false;
}
log() << "bad recv() len: " << len << '\n';
@@ -321,7 +322,7 @@ again:
char *p = (char *) &md->id;
int left = len -4;
while ( 1 ) {
- int x = ::recv(sock, p, left, 0);
+ int x = recv( p, left );
if ( x == 0 ) {
DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
m.reset();
@@ -376,6 +377,7 @@ again:
}
void MessagingPort::say(Message& toSend, int responseTo) {
+ assert( toSend.data );
mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
toSend.data->id = nextMessageId();
toSend.data->responseTo = responseTo;
@@ -395,7 +397,7 @@ again:
}
if ( x == -100 )
- x = ::send(sock, (char*)toSend.data, toSend.data->len , portSendFlags );
+ x = send( (char*)toSend.data, toSend.data->len );
if ( x <= 0 ) {
log() << "MessagingPort say send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
@@ -404,6 +406,14 @@ again:
}
+ int MessagingPort::send( const char * data , const int len ){
+ return ::send( sock , data , len , portSendFlags );
+ }
+
+ int MessagingPort::recv( char * buf , int max ){
+ return ::recv( sock , buf , max , portRecvFlags );
+ }
+
void MessagingPort::piggyBack( Message& toSend , int responseTo ) {
if ( toSend.data->len > 1300 ) {
@@ -438,7 +448,7 @@ again:
} msgstart;
MSGID nextMessageId(){
- MSGID msgid = NextMsgId.atomicIncrement();
+ MSGID msgid = NextMsgId++;
if ( usingClientIds ){
msgid = msgid & 0xFFFF;
diff --git a/util/message.h b/util/message.h
index 8d6a46e..5dccaef 100644
--- a/util/message.h
+++ b/util/message.h
@@ -18,13 +18,14 @@
#pragma once
#include "../util/sock.h"
+#include "../util/atomic_int.h"
namespace mongo {
class Message;
class MessagingPort;
class PiggyBackData;
- typedef WrappingInt MSGID;
+ typedef AtomicUInt MSGID;
class Listener {
public:
@@ -73,6 +74,9 @@ namespace mongo {
void piggyBack( Message& toSend , int responseTo = -1 );
virtual unsigned remotePort();
+
+ int send( const char * data , const int len );
+ int recv( char * data , int max );
private:
int sock;
PiggyBackData * piggyBackData;
@@ -99,6 +103,24 @@ namespace mongo {
bool doesOpGetAResponse( int op );
+ inline const char * opToString( int op ){
+ switch ( op ){
+ case 0: return "none";
+ case opReply: return "reply";
+ case dbMsg: return "msg";
+ case dbUpdate: return "update";
+ case dbInsert: return "insert";
+ case dbQuery: return "query";
+ case dbGetMore: return "getmore";
+ case dbDelete: return "remove";
+ case dbKillCursors: return "killcursors";
+ default:
+ PRINT(op);
+ assert(0);
+ return "";
+ }
+ }
+
struct MsgData {
int len; /* len of the msg, including this field */
MSGID id; /* request/reply id's match... */
@@ -146,10 +168,14 @@ namespace mongo {
~Message() {
reset();
}
-
+
SockAddr from;
MsgData *data;
+ int operation() const {
+ return data->operation();
+ }
+
Message& operator=(Message& r) {
assert( data == 0 );
data = r.data;
@@ -175,9 +201,9 @@ namespace mongo {
void setData(int operation, const char *msgtxt) {
setData(operation, msgtxt, strlen(msgtxt)+1);
}
- void setData(int operation, const char *msgdata, int len) {
+ void setData(int operation, const char *msgdata, size_t len) {
assert(data == 0);
- int dataLen = len + sizeof(MsgData) - 4;
+ size_t dataLen = len + sizeof(MsgData) - 4;
MsgData *d = (MsgData *) malloc(dataLen);
memcpy(d->_data, msgdata, len);
d->len = fixEndian(dataLen);
diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp
index 4d5fab0..7fca29a 100644
--- a/util/message_server_asio.cpp
+++ b/util/message_server_asio.cpp
@@ -27,23 +27,58 @@
#include "message.h"
#include "message_server.h"
-#include "../util/thread_pool.h"
+#include "../util/mvar.h"
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
-//using namespace std;
namespace mongo {
+ class MessageServerSession;
+
namespace {
- ThreadPool tp;
+ class StickyThread{
+ public:
+ StickyThread()
+ : _thread(boost::ref(*this))
+ {}
+
+ ~StickyThread(){
+ _mss.put(boost::shared_ptr<MessageServerSession>());
+ _thread.join();
+ }
+
+ void ready(boost::shared_ptr<MessageServerSession> mss){
+ _mss.put(mss);
+ }
+
+ void operator() (){
+ boost::shared_ptr<MessageServerSession> mss;
+ while((mss = _mss.take())){ // intentionally not using ==
+ task(mss.get());
+ mss.reset();
+ }
+ }
+
+ private:
+ boost::thread _thread;
+ inline void task(MessageServerSession* mss); // must be defined after MessageServerSession
+
+ MVar<boost::shared_ptr<MessageServerSession> > _mss; // populated when given a task
+ };
+
+ vector<boost::shared_ptr<StickyThread> > thread_pool;
+ mongo::mutex tp_mutex; // this is only needed if io_service::run() is called from multiple threads
}
class MessageServerSession : public boost::enable_shared_from_this<MessageServerSession> , public AbstractMessagingPort {
public:
- MessageServerSession( MessageHandler * handler , io_service& ioservice ) : _handler( handler ) , _socket( ioservice ){
-
- }
+ MessageServerSession( MessageHandler * handler , io_service& ioservice )
+ : _handler( handler )
+ , _socket( ioservice )
+ , _portCache(0)
+ { }
+
~MessageServerSession(){
cout << "disconnect from: " << _socket.remote_endpoint() << endl;
}
@@ -81,7 +116,20 @@ namespace mongo {
}
void handleReadBody( const boost::system::error_code& error ){
- tp.schedule(&MessageServerSession::process, shared_from_this());
+ if (!_myThread){
+ mongo::mutex::scoped_lock(tp_mutex);
+ if (!thread_pool.empty()){
+ _myThread = thread_pool.back();
+ thread_pool.pop_back();
+ }
+ }
+
+ if (!_myThread) // pool is empty
+ _myThread.reset(new StickyThread());
+
+ assert(_myThread);
+
+ _myThread->ready(shared_from_this());
}
void process(){
@@ -98,6 +146,13 @@ namespace mongo {
}
void handleWriteDone( const boost::system::error_code& error ){
+ {
+ // return thread to pool after we have sent data to the client
+ mongo::mutex::scoped_lock(tp_mutex);
+ assert(_myThread);
+ thread_pool.push_back(_myThread);
+ _myThread.reset();
+ }
_cur.reset();
_reply.reset();
_startHeaderRead();
@@ -117,7 +172,9 @@ namespace mongo {
virtual unsigned remotePort(){
- return _socket.remote_endpoint().port();
+ if (!_portCache)
+ _portCache = _socket.remote_endpoint().port(); //this is expensive
+ return _portCache;
}
private:
@@ -134,7 +191,15 @@ namespace mongo {
MsgData _inHeader;
Message _cur;
Message _reply;
+
+ unsigned _portCache;
+
+ boost::shared_ptr<StickyThread> _myThread;
};
+
+ void StickyThread::task(MessageServerSession* mss){
+ mss->process();
+ }
class AsyncMessageServer : public MessageServer {
@@ -152,6 +217,7 @@ namespace mongo {
void run(){
cout << "AsyncMessageServer starting to listen on: " << _port << endl;
+ boost::thread other(boost::bind(&io_service::run, &_ioservice));
_ioservice.run();
cout << "AsyncMessageServer done listening on: " << _port << endl;
}
diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp
index e5becc9..fa8f9e5 100644
--- a/util/message_server_port.cpp
+++ b/util/message_server_port.cpp
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+#include "stdafx.h"
+
#ifndef USE_ASIO
#include "message.h"
diff --git a/util/miniwebserver.cpp b/util/miniwebserver.cpp
index b492153..61619d8 100644
--- a/util/miniwebserver.cpp
+++ b/util/miniwebserver.cpp
@@ -17,6 +17,7 @@
#include "stdafx.h"
#include "miniwebserver.h"
+#include "hex.h"
#include <pcrecpp.h>
@@ -81,12 +82,13 @@ namespace mongo {
return string( urlStart , (int)(end-urlStart) );
}
- void MiniWebServer::parseParams( map<string,string> & params , string query ) {
+ void MiniWebServer::parseParams( BSONObj & params , string query ) {
if ( query.size() == 0 )
return;
-
+
+ BSONObjBuilder b;
while ( query.size() ) {
-
+
string::size_type amp = query.find( "&" );
string cur;
@@ -103,9 +105,10 @@ namespace mongo {
if ( eq == string::npos )
continue;
- params[cur.substr(0,eq)] = cur.substr(eq+1);
+ b.append( urlDecode(cur.substr(0,eq)).c_str() , urlDecode(cur.substr(eq+1) ) );
}
- return;
+
+ params = b.obj();
}
string MiniWebServer::parseMethod( const char * headers ) {
@@ -203,7 +206,7 @@ namespace mongo {
void MiniWebServer::run() {
SockAddr from;
- while ( 1 ) {
+ while ( ! inShutdown() ) {
int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
if ( s < 0 ) {
if ( errno == ECONNABORTED ) {
@@ -221,4 +224,20 @@ namespace mongo {
}
}
+ string MiniWebServer::urlDecode(const char* s){
+ stringstream out;
+ while(*s){
+ if (*s == '+'){
+ out << ' ';
+ }else if (*s == '%'){
+ out << fromHex(s+1);
+ s+=2;
+ }else{
+ out << *s;
+ }
+ s++;
+ }
+ return out.str();
+ }
+
} // namespace mongo
diff --git a/util/miniwebserver.h b/util/miniwebserver.h
index 27476d6..bdd2873 100644
--- a/util/miniwebserver.h
+++ b/util/miniwebserver.h
@@ -17,7 +17,9 @@
#pragma once
+#include "../stdafx.h"
#include "message.h"
+#include "../db/jsobj.h"
namespace mongo {
@@ -45,9 +47,12 @@ namespace mongo {
string parseURL( const char * buf );
string parseMethod( const char * headers );
string getHeader( const char * headers , string name );
- void parseParams( map<string,string> & params , string query );
+ void parseParams( BSONObj & params , string query );
static const char *body( const char *buf );
+ static string urlDecode(const char* s);
+ static string urlDecode(string s) {return urlDecode(s.c_str());}
+
private:
void accepted(int s, const SockAddr &from);
static bool fullReceive( const char *buf );
diff --git a/util/mmap.cpp b/util/mmap.cpp
index f3103d0..f6bbc73 100644
--- a/util/mmap.cpp
+++ b/util/mmap.cpp
@@ -17,20 +17,21 @@
#include "stdafx.h"
#include "mmap.h"
+#include "processinfo.h"
namespace mongo {
set<MemoryMappedFile*> mmfiles;
- boost::mutex mmmutex;
+ mongo::mutex mmmutex;
MemoryMappedFile::~MemoryMappedFile() {
close();
- boostlock lk( mmmutex );
+ scoped_lock lk( mmmutex );
mmfiles.erase(this);
}
void MemoryMappedFile::created(){
- boostlock lk( mmmutex );
+ scoped_lock lk( mmmutex );
mmfiles.insert(this);
}
@@ -54,7 +55,7 @@ namespace mongo {
long long MemoryMappedFile::totalMappedLength(){
unsigned long long total = 0;
- boostlock lk( mmmutex );
+ scoped_lock lk( mmmutex );
for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ )
total += (*i)->length();
@@ -64,7 +65,7 @@ namespace mongo {
int MemoryMappedFile::flushAll( bool sync ){
int num = 0;
- boostlock lk( mmmutex );
+ scoped_lock lk( mmmutex );
for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
num++;
MemoryMappedFile * mmf = *i;
@@ -92,4 +93,18 @@ namespace mongo {
return map( filename , i );
}
+ void printMemInfo( const char * where ){
+ cout << "mem info: ";
+ if ( where )
+ cout << where << " ";
+ ProcessInfo pi;
+ if ( ! pi.supported() ){
+ cout << " not supported" << endl;
+ return;
+ }
+
+ cout << "vsize: " << pi.getVirtualMemorySize() << " resident: " << pi.getResidentSize() << " mapped: " << ( MemoryMappedFile::totalMappedLength() / ( 1024 * 1024 ) ) << endl;
+ }
+
+
} // namespace mongo
diff --git a/util/mmap.h b/util/mmap.h
index ed4ca99..947364b 100644
--- a/util/mmap.h
+++ b/util/mmap.h
@@ -22,6 +22,10 @@ namespace mongo {
class MemoryMappedFile {
public:
+ enum Options {
+ SEQUENTIAL = 1
+ };
+
MemoryMappedFile();
~MemoryMappedFile(); /* closes the file if open */
void close();
@@ -32,7 +36,7 @@ namespace mongo {
/* Creates with length if DNE, otherwise uses existing file length,
passed length.
*/
- void* map(const char *filename, long &length);
+ void* map(const char *filename, long &length, int options = 0 );
void flush(bool sync);
@@ -58,6 +62,7 @@ namespace mongo {
void *view;
long len;
};
-
+
+ void printMemInfo( const char * where );
} // namespace mongo
diff --git a/util/mmap_mm.cpp b/util/mmap_mm.cpp
index aa9b275..9cffad5 100644
--- a/util/mmap_mm.cpp
+++ b/util/mmap_mm.cpp
@@ -31,14 +31,13 @@ namespace mongo {
void MemoryMappedFile::close() {
if ( view )
- delete( view );
+ free( view );
view = 0;
len = 0;
}
- void* MemoryMappedFile::map(const char *filename, size_t length) {
- path p( filename );
-
+ void* MemoryMappedFile::map(const char *filename, long& length , int options ) {
+ assert( length );
view = malloc( length );
return view;
}
diff --git a/util/mmap_posix.cpp b/util/mmap_posix.cpp
index 1237220..836373d 100644
--- a/util/mmap_posix.cpp
+++ b/util/mmap_posix.cpp
@@ -49,7 +49,7 @@ namespace mongo {
#define O_NOATIME 0
#endif
- void* MemoryMappedFile::map(const char *filename, long &length) {
+ void* MemoryMappedFile::map(const char *filename, long &length, int options) {
// length may be updated by callee.
theFileAllocator().allocateAsap( filename, length );
len = length;
@@ -79,9 +79,19 @@ namespace mongo {
}
return 0;
}
+
+#if defined(__sunos__)
+#warning madvise not supported on solaris yet
+#else
+ if ( options & SEQUENTIAL ){
+ if ( madvise( view , length , MADV_SEQUENTIAL ) ){
+ out() << " madvise failed for " << filename << " " << OUTPUT_ERRNO << endl;
+ }
+ }
+#endif
return view;
}
-
+
void MemoryMappedFile::flush(bool sync) {
if ( view == 0 || fd == 0 )
return;
diff --git a/util/mmap_win.cpp b/util/mmap_win.cpp
index 8a0d306..d831d66 100644
--- a/util/mmap_win.cpp
+++ b/util/mmap_win.cpp
@@ -49,7 +49,7 @@ namespace mongo {
unsigned mapped = 0;
- void* MemoryMappedFile::map(const char *_filename, long &length) {
+ void* MemoryMappedFile::map(const char *_filename, long &length, int options) {
/* big hack here: Babble uses db names with colons. doesn't seem to work on windows. temporary perhaps. */
char filename[256];
strncpy(filename, _filename, 255);
@@ -69,9 +69,13 @@ namespace mongo {
updateLength( filename, length );
std::wstring filenamew = toWideString(filename);
+ DWORD createOptions = FILE_ATTRIBUTE_NORMAL;
+ if ( options & SEQUENTIAL )
+ createOptions |= FILE_FLAG_SEQUENTIAL_SCAN;
+
fd = CreateFile(
filenamew.c_str(), GENERIC_WRITE | GENERIC_READ, FILE_SHARE_READ,
- NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
+ NULL, OPEN_ALWAYS, createOptions , NULL);
if ( fd == INVALID_HANDLE_VALUE ) {
out() << "Create/OpenFile failed " << filename << ' ' << GetLastError() << endl;
return 0;
@@ -95,7 +99,21 @@ namespace mongo {
return view;
}
- void MemoryMappedFile::flush(bool) {
- }
+ void MemoryMappedFile::flush(bool sync) {
+ uassert(13056, "Async flushing not supported on windows", sync);
+ if (!view || !fd) return;
+
+ bool success = FlushViewOfFile(view, 0); // 0 means whole mapping
+ if (!success){
+ int err = GetLastError();
+ out() << "FlushViewOfFile failed " << err << endl;
+ }
+
+ success = FlushFileBuffers(fd);
+ if (!success){
+ int err = GetLastError();
+ out() << "FlushFileBuffers failed " << err << endl;
+ }
+ }
}
diff --git a/util/optime.h b/util/optime.h
index b7d4f61..8b26434 100644
--- a/util/optime.h
+++ b/util/optime.h
@@ -20,15 +20,24 @@
#include "../db/concurrency.h"
namespace mongo {
+ void exitCleanly( int code );
/* Operation sequence #. A combination of current second plus an ordinal value.
*/
+ struct ClockSkewException : public DBException {
+ virtual const char* what() const throw() { return "clock skew exception"; }
+ virtual int getCode(){ return 20001; }
+ };
+
#pragma pack(4)
class OpTime {
unsigned i;
unsigned secs;
static OpTime last;
public:
+ static void setLast(const Date_t &date) {
+ last = OpTime(date);
+ }
unsigned getSecs() const {
return secs;
}
@@ -49,6 +58,20 @@ namespace mongo {
static OpTime now() {
unsigned t = (unsigned) time(0);
// DEV assertInWriteLock();
+ if ( t < last.secs ){
+ bool toLog = false;
+ ONCE toLog = true;
+ RARELY toLog = true;
+ if ( last.i & 0x80000000 )
+ toLog = true;
+ if ( toLog )
+ log() << "clock skew detected prev: " << last.secs << " now: " << t << " trying to handle..." << endl;
+ if ( last.i & 0x80000000 ) {
+ log() << "ERROR Large clock skew detected, shutting down" << endl;
+ throw ClockSkewException();
+ }
+ t = last.secs;
+ }
if ( last.secs == t ) {
last.i++;
return last;
diff --git a/util/processinfo.h b/util/processinfo.h
index 83c3bcf..b7bc90d 100644
--- a/util/processinfo.h
+++ b/util/processinfo.h
@@ -52,6 +52,9 @@ namespace mongo {
bool supported();
+ bool blockCheckSupported();
+ bool blockInMemory( char * start );
+
private:
pid_t _pid;
};
diff --git a/util/processinfo_darwin.cpp b/util/processinfo_darwin.cpp
index 904f967..206c270 100644
--- a/util/processinfo_darwin.cpp
+++ b/util/processinfo_darwin.cpp
@@ -15,8 +15,9 @@
* limitations under the License.
*/
+#include "../stdafx.h"
#include "processinfo.h"
-
+#include "log.h"
#include <mach/task_info.h>
@@ -29,6 +30,9 @@
#include <mach/shared_memory_server.h>
#include <iostream>
+#include <sys/types.h>
+#include <sys/mman.h>
+
using namespace std;
namespace mongo {
@@ -63,7 +67,7 @@ namespace mongo {
cout << "error getting task_info: " << result << endl;
return 0;
}
- return (int)((double)ti.virtual_size / (1024.0 * 1024 * 2 ) );
+ return (int)((double)ti.virtual_size / (1024.0 * 1024 ) );
}
int ProcessInfo::getResidentSize(){
@@ -92,4 +96,22 @@ namespace mongo {
void ProcessInfo::getExtraInfo(BSONObjBuilder& info) {}
+ bool ProcessInfo::blockCheckSupported(){
+ return true;
+ }
+
+ bool ProcessInfo::blockInMemory( char * start ){
+ static long pageSize = 0;
+ if ( pageSize == 0 ){
+ pageSize = sysconf( _SC_PAGESIZE );
+ }
+ start = start - ( (unsigned long long)start % pageSize );
+ char x = 0;
+ if ( mincore( start , 128 , &x ) ){
+ log() << "mincore failed: " << OUTPUT_ERRNO << endl;
+ return 1;
+ }
+ return x & 0x1;
+ }
+
}
diff --git a/util/processinfo_linux2.cpp b/util/processinfo_linux2.cpp
index 3e00c06..eaaee09 100644
--- a/util/processinfo_linux2.cpp
+++ b/util/processinfo_linux2.cpp
@@ -21,6 +21,8 @@
#include <stdio.h>
#include <malloc.h>
#include <db/jsobj.h>
+#include <unistd.h>
+#include <sys/mman.h>
using namespace std;
@@ -212,4 +214,23 @@ namespace mongo {
info.append("page_faults", (int)p._maj_flt);
}
+ bool ProcessInfo::blockCheckSupported(){
+ return true;
+ }
+
+ bool ProcessInfo::blockInMemory( char * start ){
+ static long pageSize = 0;
+ if ( pageSize == 0 ){
+ pageSize = sysconf( _SC_PAGESIZE );
+ }
+ start = start - ( (unsigned long long)start % pageSize );
+ unsigned char x = 0;
+ if ( mincore( start , 128 , &x ) ){
+ log() << "mincore failed: " << OUTPUT_ERRNO << endl;
+ return 1;
+ }
+ return x & 0x1;
+ }
+
+
}
diff --git a/util/processinfo_none.cpp b/util/processinfo_none.cpp
index 57f4ca3..9af1766 100644
--- a/util/processinfo_none.cpp
+++ b/util/processinfo_none.cpp
@@ -42,5 +42,14 @@ namespace mongo {
}
void ProcessInfo::getExtraInfo(BSONObjBuilder& info) {}
+
+ bool ProcessInfo::blockCheckSupported(){
+ return false;
+ }
+
+ bool ProcessInfo::blockInMemory( char * start ){
+ assert(0);
+ return true;
+ }
}
diff --git a/util/processinfo_win32.cpp b/util/processinfo_win32.cpp
index 0f0bf2e..0705fcb 100644
--- a/util/processinfo_win32.cpp
+++ b/util/processinfo_win32.cpp
@@ -61,4 +61,14 @@ namespace mongo {
}
void ProcessInfo::getExtraInfo(BSONObjBuilder& info) {}
+
+ bool ProcessInfo::blockCheckSupported(){
+ return false;
+ }
+
+ bool ProcessInfo::blockInMemory( char * start ){
+ assert(0);
+ return true;
+ }
+
}
diff --git a/util/queue.h b/util/queue.h
index 8f4fbaf..d48e012 100644
--- a/util/queue.h
+++ b/util/queue.h
@@ -30,18 +30,18 @@ namespace mongo {
template<typename T> class BlockingQueue : boost::noncopyable {
public:
void push(T const& t){
- boostlock l( _lock );
+ scoped_lock l( _lock );
_queue.push( t );
_condition.notify_one();
}
bool empty() const {
- boostlock l( _lock );
+ scoped_lock l( _lock );
return _queue.empty();
}
bool tryPop( T & t ){
- boostlock l( _lock );
+ scoped_lock l( _lock );
if ( _queue.empty() )
return false;
@@ -53,9 +53,9 @@ namespace mongo {
T blockingPop(){
- boostlock l( _lock );
+ scoped_lock l( _lock );
while( _queue.empty() )
- _condition.wait( l );
+ _condition.wait( l.boost() );
T t = _queue.front();
_queue.pop();
@@ -65,7 +65,7 @@ namespace mongo {
private:
std::queue<T> _queue;
- mutable boost::mutex _lock;
+ mutable mongo::mutex _lock;
boost::condition _condition;
};
diff --git a/util/sock.cpp b/util/sock.cpp
index 5172692..5beac68 100644
--- a/util/sock.cpp
+++ b/util/sock.cpp
@@ -20,14 +20,14 @@
namespace mongo {
- static boost::mutex sock_mutex;
+ static mongo::mutex sock_mutex;
string hostbyname(const char *hostname) {
static string unknown = "0.0.0.0";
if ( unknown == hostname )
return unknown;
- boostlock lk(sock_mutex);
+ scoped_lock lk(sock_mutex);
#if defined(_WIN32)
if( inet_addr(hostname) != INADDR_NONE )
return hostname;
diff --git a/util/sock.h b/util/sock.h
index 5798a71..ee7a7ae 100644
--- a/util/sock.h
+++ b/util/sock.h
@@ -245,25 +245,25 @@ namespace mongo {
}
void add( int sock ){
- boostlock lk( _mutex );
+ scoped_lock lk( _mutex );
_sockets->insert( sock );
}
void remove( int sock ){
- boostlock lk( _mutex );
+ scoped_lock lk( _mutex );
_sockets->erase( sock );
}
void closeAll(){
set<int>* s;
{
- boostlock lk( _mutex );
+ scoped_lock lk( _mutex );
s = _sockets;
_sockets = new set<int>();
}
for ( set<int>::iterator i=s->begin(); i!=s->end(); i++ ){
int sock = *i;
- log() << "going to close listening socket: " << sock << endl;
+ log() << "\t going to close listening socket: " << sock << endl;
closesocket( sock );
}
@@ -272,7 +272,7 @@ namespace mongo {
static ListeningSockets* get();
private:
- boost::mutex _mutex;
+ mongo::mutex _mutex;
set<int>* _sockets;
static ListeningSockets* _instance;
};
diff --git a/util/thread_pool.cpp b/util/thread_pool.cpp
index b95bc1d..77d0d05 100644
--- a/util/thread_pool.cpp
+++ b/util/thread_pool.cpp
@@ -77,7 +77,7 @@ ThreadPool::ThreadPool(int nThreads)
: _tasksRemaining(0)
, _nThreads(nThreads)
{
- boostlock lock(_mutex);
+ scoped_lock lock(_mutex);
while (nThreads-- > 0){
Worker* worker = new Worker(*this);
_freeWorkers.push_front(worker);
@@ -99,14 +99,14 @@ ThreadPool::~ThreadPool(){
}
void ThreadPool::join(){
- boostlock lock(_mutex);
+ scoped_lock lock(_mutex);
while(_tasksRemaining){
- _condition.wait(lock);
+ _condition.wait(lock.boost());
}
}
void ThreadPool::schedule(Task task){
- boostlock lock(_mutex);
+ scoped_lock lock(_mutex);
_tasksRemaining++;
@@ -120,7 +120,7 @@ void ThreadPool::schedule(Task task){
// should only be called by a worker from the worker thread
void ThreadPool::task_done(Worker* worker){
- boostlock lock(_mutex);
+ scoped_lock lock(_mutex);
if (!_tasks.empty()){
worker->set_task(_tasks.front());
diff --git a/util/thread_pool.h b/util/thread_pool.h
index 91c2969..d891d7d 100644
--- a/util/thread_pool.h
+++ b/util/thread_pool.h
@@ -62,7 +62,7 @@ namespace threadpool {
int tasks_remaining() { return _tasksRemaining; }
private:
- boost::mutex _mutex;
+ mongo::mutex _mutex;
boost::condition _condition;
list<Worker*> _freeWorkers; //used as LIFO stack (always front)
diff --git a/util/top.cpp b/util/top.cpp
deleted file mode 100644
index 98d9598..0000000
--- a/util/top.cpp
+++ /dev/null
@@ -1,18 +0,0 @@
-// top.cpp
-
-#include "stdafx.h"
-#include "top.h"
-
-namespace mongo {
-
- Top::T Top::_snapshotStart = Top::currentTime();
- Top::D Top::_snapshotDuration;
- Top::UsageMap Top::_totalUsage;
- Top::UsageMap Top::_snapshotA;
- Top::UsageMap Top::_snapshotB;
- Top::UsageMap &Top::_snapshot = Top::_snapshotA;
- Top::UsageMap &Top::_nextSnapshot = Top::_snapshotB;
- boost::mutex Top::topMutex;
-
-
-}
diff --git a/util/top.h b/util/top.h
deleted file mode 100644
index aaf7c3f..0000000
--- a/util/top.h
+++ /dev/null
@@ -1,183 +0,0 @@
-// top.h : DB usage monitor.
-
-/* Copyright 2009 10gen Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <boost/date_time/posix_time/posix_time.hpp>
-#undef assert
-#define assert xassert
-
-namespace mongo {
-
- /* Records per namespace utilization of the mongod process.
- No two functions of this class may be called concurrently.
- */
- class Top {
- typedef boost::posix_time::ptime T;
- typedef boost::posix_time::time_duration D;
- typedef boost::tuple< D, int, int, int > UsageData;
- public:
- Top() : _read(false), _write(false) { }
-
- /* these are used to record activity: */
-
- void clientStart( const char *client ) {
- clientStop();
- _currentStart = currentTime();
- _current = client;
- }
-
- /* indicate current request is a read operation. */
- void setRead() { _read = true; }
-
- void setWrite() { _write = true; }
-
- void clientStop() {
- if ( _currentStart == T() )
- return;
- D d = currentTime() - _currentStart;
-
- {
- boostlock L(topMutex);
- recordUsage( _current, d );
- }
-
- _currentStart = T();
- _read = false;
- _write = false;
- }
-
- /* these are used to fetch the stats: */
-
- struct Usage {
- string ns;
- D time;
- double pct;
- int reads, writes, calls;
- };
-
- static void usage( vector< Usage > &res ) {
- boostlock L(topMutex);
-
- // Populate parent namespaces
- UsageMap snapshot;
- UsageMap totalUsage;
- fillParentNamespaces( snapshot, _snapshot );
- fillParentNamespaces( totalUsage, _totalUsage );
-
- multimap< D, string, more > sorted;
- for( UsageMap::iterator i = snapshot.begin(); i != snapshot.end(); ++i )
- sorted.insert( make_pair( i->second.get<0>(), i->first ) );
- for( multimap< D, string, more >::iterator i = sorted.begin(); i != sorted.end(); ++i ) {
- if ( trivialNs( i->second.c_str() ) )
- continue;
- Usage u;
- u.ns = i->second;
- u.time = totalUsage[ u.ns ].get<0>();
- u.pct = _snapshotDuration != D() ? 100.0 * i->first.ticks() / _snapshotDuration.ticks() : 0;
- u.reads = snapshot[ u.ns ].get<1>();
- u.writes = snapshot[ u.ns ].get<2>();
- u.calls = snapshot[ u.ns ].get<3>();
- res.push_back( u );
- }
- for( UsageMap::iterator i = totalUsage.begin(); i != totalUsage.end(); ++i ) {
- if ( snapshot.count( i->first ) != 0 || trivialNs( i->first.c_str() ) )
- continue;
- Usage u;
- u.ns = i->first;
- u.time = i->second.get<0>();
- u.pct = 0;
- u.reads = 0;
- u.writes = 0;
- u.calls = 0;
- res.push_back( u );
- }
- }
-
- static void completeSnapshot() {
- boostlock L(topMutex);
-
- if ( &_snapshot == &_snapshotA ) {
- _snapshot = _snapshotB;
- _nextSnapshot = _snapshotA;
- } else {
- _snapshot = _snapshotA;
- _nextSnapshot = _snapshotB;
- }
- _snapshotDuration = currentTime() - _snapshotStart;
- _snapshotStart = currentTime();
- _nextSnapshot.clear();
- }
-
- private:
- static boost::mutex topMutex;
- static bool trivialNs( const char *ns ) {
- const char *ret = strrchr( ns, '.' );
- return ret && ret[ 1 ] == '\0';
- }
- typedef map<string,UsageData> UsageMap; // duration, # reads, # writes, # total calls
- static T currentTime() {
- return boost::posix_time::microsec_clock::universal_time();
- }
- void recordUsage( const string &client, D duration ) {
- recordUsageForMap( _totalUsage, client, duration );
- recordUsageForMap( _nextSnapshot, client, duration );
- }
- void recordUsageForMap( UsageMap &map, const string &client, D duration ) {
- UsageData& g = map[client];
- g.get< 0 >() += duration;
- if ( _read && !_write )
- g.get< 1 >()++;
- else if ( !_read && _write )
- g.get< 2 >()++;
- g.get< 3 >()++;
- }
- static void fillParentNamespaces( UsageMap &to, const UsageMap &from ) {
- for( UsageMap::const_iterator i = from.begin(); i != from.end(); ++i ) {
- string current = i->first;
- size_t dot = current.rfind( "." );
- if ( dot == string::npos || dot != current.length() - 1 ) {
- inc( to[ current ], i->second );
- }
- while( dot != string::npos ) {
- current = current.substr( 0, dot );
- inc( to[ current ], i->second );
- dot = current.rfind( "." );
- }
- }
- }
- static void inc( UsageData &to, const UsageData &from ) {
- to.get<0>() += from.get<0>();
- to.get<1>() += from.get<1>();
- to.get<2>() += from.get<2>();
- to.get<3>() += from.get<3>();
- }
- struct more { bool operator()( const D &a, const D &b ) { return a > b; } };
- string _current;
- T _currentStart;
- static T _snapshotStart;
- static D _snapshotDuration;
- static UsageMap _totalUsage;
- static UsageMap _snapshotA;
- static UsageMap _snapshotB;
- static UsageMap &_snapshot;
- static UsageMap &_nextSnapshot;
- bool _read;
- bool _write;
- };
-
-} // namespace mongo
diff --git a/util/util.cpp b/util/util.cpp
index 78d8d52..8ae00f3 100644
--- a/util/util.cpp
+++ b/util/util.cpp
@@ -18,7 +18,6 @@
#include "stdafx.h"
#include "goodies.h"
#include "unittest.h"
-#include "top.h"
#include "file_allocator.h"
#include "optime.h"
@@ -35,7 +34,7 @@ namespace mongo {
const char * (*getcurns)() = default_getcurns;
int logLevel = 0;
- boost::mutex &Logstream::mutex = *( new boost::mutex );
+ mongo::mutex Logstream::mutex;
int Logstream::doneSetup = Logstream::magicNumber();
bool goingAway = false;
@@ -113,9 +112,9 @@ namespace mongo {
#if defined(_WIN32)
(std::cout << now << " " << s).flush();
#else
- assert( write( STDOUT_FILENO, now, 20 ) > 0 );
- assert( write( STDOUT_FILENO, " ", 1 ) > 0 );
- assert( write( STDOUT_FILENO, s.c_str(), s.length() ) > 0 );
+ write( STDOUT_FILENO, now, 20 );
+ write( STDOUT_FILENO, " ", 1 );
+ write( STDOUT_FILENO, s.c_str(), s.length() );
fsync( STDOUT_FILENO );
#endif
}
@@ -133,5 +132,12 @@ namespace mongo {
ss << "db version v" << versionString << ", pdfile version " << VERSION << "." << VERSION_MINOR;
return ss.str();
}
-
+
+ ostream& operator<<( ostream &s, const ThreadSafeString &o ){
+ s << (string)o;
+ return s;
+ }
+
+ bool __destroyingStatics = false;
+
} // namespace mongo