diff options
Diffstat (limited to 'util')
43 files changed, 1054 insertions, 412 deletions
diff --git a/util/allocator.h b/util/allocator.h index af8032c..90dbf24 100644 --- a/util/allocator.h +++ b/util/allocator.h @@ -34,16 +34,4 @@ namespace mongo { #define malloc mongo::ourmalloc #define realloc mongo::ourrealloc -#if defined(_WIN32) - inline void our_debug_free(void *p) { -#if 0 - // this is not safe if you malloc < 4 bytes so we don't use anymore - unsigned *u = (unsigned *) p; - u[0] = 0xEEEEEEEE; -#endif - free(p); - } -#define free our_debug_free -#endif - } // namespace mongo diff --git a/util/array.h b/util/array.h new file mode 100644 index 0000000..827c00e --- /dev/null +++ b/util/array.h @@ -0,0 +1,104 @@ +// array.h + +namespace mongo { + + template<typename T> + class FastArray { + public: + FastArray( int capacity=10000 ) + : _capacity( capacity ) , _size(0) , _end(this,capacity){ + _data = new T[capacity]; + } + + ~FastArray(){ + delete[] _data; + } + + void clear(){ + _size = 0; + } + + T& operator[]( int x ){ + assert( x >= 0 && x < _capacity ); + return _data[x]; + } + + T& getNext(){ + return _data[_size++]; + } + + void push_back( const T& t ){ + _data[_size++] = t; + } + + void sort( int (*comp)(const void *, const void *) ){ + qsort( _data , _size , sizeof(T) , comp ); + } + + int size(){ + return _size; + } + + bool hasSpace(){ + return _size < _capacity; + } + class iterator { + public: + iterator(){ + _it = 0; + _pos = 0; + } + + iterator( FastArray * it , int pos=0 ){ + _it = it; + _pos = pos; + } + + bool operator==(const iterator& other ) const { + return _pos == other._pos; + } + + bool operator!=(const iterator& other ) const { + return _pos != other._pos; + } + + void operator++(){ + _pos++; + } + + T& operator*(){ + return _it->_data[_pos]; + } + + operator string() const { + stringstream ss; + ss << _pos; + return ss.str(); + } + private: + FastArray * _it; + int _pos; + + friend class FastArray; + }; + + + iterator begin(){ + return iterator(this); + } + + iterator end(){ + _end._pos = _size; + return _end; + } + + + private: + int _capacity; + int _size; + + iterator _end; + + T * _data; + }; +} diff --git a/util/assert_util.cpp b/util/assert_util.cpp index d1d85b2..8c8477a 100644 --- a/util/assert_util.cpp +++ b/util/assert_util.cpp @@ -22,6 +22,26 @@ namespace mongo { + AssertionCount assertionCount; + + AssertionCount::AssertionCount() + : regular(0),warning(0),msg(0),user(0),rollovers(0){ + } + + void AssertionCount::rollover(){ + rollovers++; + regular = 0; + warning = 0; + msg = 0; + user = 0; + } + + void AssertionCount::condrollover( int newvalue ){ + static int max = (int)pow( 2.0 , 30 ); + if ( newvalue >= max ) + rollover(); + } + string getDbContext(); Assertion lastAssert[4]; @@ -32,9 +52,11 @@ namespace mongo { sayDbContext(); raiseError(0,msg && *msg ? msg : "wassertion failure"); lastAssert[1].set(msg, getDbContext().c_str(), file, line); + assertionCount.condrollover( ++assertionCount.warning ); } void asserted(const char *msg, const char *file, unsigned line) { + assertionCount.condrollover( ++assertionCount.regular ); problem() << "Assertion failure " << msg << ' ' << file << ' ' << dec << line << endl; sayDbContext(); raiseError(0,msg && *msg ? msg : "assertion failure"); @@ -54,6 +76,7 @@ namespace mongo { int uacount = 0; void uasserted(int msgid, const char *msg) { + assertionCount.condrollover( ++assertionCount.user ); if ( ++uacount < 100 ) log() << "User Exception " << msgid << ":" << msg << endl; else @@ -64,6 +87,7 @@ namespace mongo { } void msgasserted(int msgid, const char *msg) { + assertionCount.condrollover( ++assertionCount.warning ); log() << "Assertion: " << msgid << ":" << msg << endl; lastAssert[2].set(msg, getDbContext().c_str(), "", 0); raiseError(msgid,msg && *msg ? msg : "massert failure"); @@ -72,13 +96,22 @@ namespace mongo { throw MsgAssertionException(msgid, msg); } - boost::mutex *Assertion::_mutex = new boost::mutex(); + void streamNotGood( int code , string msg , std::ios& myios ){ + stringstream ss; + // errno might not work on all systems for streams + // if it doesn't for a system should deal with here + ss << msg << " stream invalie: " << OUTPUT_ERRNO; + throw UserException( code , ss.str() ); + } + + + mongo::mutex *Assertion::_mutex = new mongo::mutex(); string Assertion::toString() { if( _mutex == 0 ) return ""; - boostlock lk(*_mutex); + scoped_lock lk(*_mutex); if ( !isSet() ) return ""; @@ -166,5 +199,14 @@ namespace mongo { void rotateLogs( int signal ){ loggingManager.rotate(); } + + string errnostring( const char * prefix ){ + stringstream ss; + if ( prefix ) + ss << prefix << ": "; + ss << OUTPUT_ERRNO; + return ss.str(); + } + } diff --git a/util/assert_util.h b/util/assert_util.h index ccb60a0..bae3a55 100644 --- a/util/assert_util.h +++ b/util/assert_util.h @@ -32,7 +32,7 @@ namespace mongo { when = 0; } private: - static boost::mutex *_mutex; + static mongo::mutex *_mutex; char msg[128]; char context[128]; const char *file; @@ -44,7 +44,7 @@ namespace mongo { /* asserted during global variable initialization */ return; } - boostlock lk(*_mutex); + scoped_lock lk(*_mutex); strncpy(msg, m, 127); strncpy(context, ctxt, 127); file = f; @@ -67,6 +67,21 @@ namespace mongo { /* last assert of diff types: regular, wassert, msgassert, uassert: */ extern Assertion lastAssert[4]; + class AssertionCount { + public: + AssertionCount(); + void rollover(); + void condrollover( int newValue ); + + int regular; + int warning; + int msg; + int user; + int rollovers; + }; + + extern AssertionCount assertionCount; + class DBException : public std::exception { public: virtual const char* what() const throw() = 0; @@ -91,6 +106,11 @@ namespace mongo { } virtual int getCode(){ return code; } virtual const char* what() const throw() { return msg.c_str(); } + + /* true if an interrupted exception - see KillCurrentOp */ + bool interrupted() { + return code == 11600 || code == 11601; + } }; /* UserExceptions are valid errors that a user can cause, like out of disk space or duplicate key */ @@ -173,6 +193,10 @@ namespace mongo { #define ASSERT_ID_DUPKEY 11000 + void streamNotGood( int code , string msg , std::ios& myios ); + +#define ASSERT_STREAM_GOOD(msgid,msg,stream) (void)( (!!((stream).good())) || (mongo::streamNotGood(msgid, msg, stream), 0) ) + } // namespace mongo #define BOOST_CHECK_EXCEPTION( expression ) \ @@ -184,3 +208,12 @@ namespace mongo { } catch ( ... ) { \ massert( 10437 , "unknown boost failed" , false ); \ } + +#define DESTRUCTOR_GUARD( expression ) \ + try { \ + expression; \ + } catch ( const std::exception &e ) { \ + problem() << "caught exception (" << e.what() << ") in destructor (" << __FUNCTION__ << ")" << endl; \ + } catch ( ... ) { \ + problem() << "caught unknown exception in destructor (" << __FUNCTION__ << ")" << endl; \ + } diff --git a/util/atomic_int.h b/util/atomic_int.h new file mode 100644 index 0000000..de50560 --- /dev/null +++ b/util/atomic_int.h @@ -0,0 +1,100 @@ +// atomic_int.h +// atomic wrapper for unsigned + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#if defined(_WIN32) +# include <windows.h> +#endif + +namespace mongo{ + + + struct AtomicUInt{ + AtomicUInt() : x(0) {} + AtomicUInt(unsigned z) : x(z) { } + volatile unsigned x; + operator unsigned() const { + return x; + } + inline AtomicUInt operator++(); // ++prefix + inline AtomicUInt operator++(int);// postfix++ + inline AtomicUInt operator--(); // --prefix + inline AtomicUInt operator--(int); // postfix-- + }; + +#if defined(_WIN32) + AtomicUInt AtomicUInt::operator++(){ + // InterlockedIncrement returns the new value + return InterlockedIncrement((volatile long*)&x); //long is 32bits in Win64 + } + AtomicUInt AtomicUInt::operator++(int){ + return InterlockedIncrement((volatile long*)&x)-1; + } + AtomicUInt AtomicUInt::operator--(){ + return InterlockedDecrement((volatile long*)&x); + } + AtomicUInt AtomicUInt::operator--(int){ + return InterlockedDecrement((volatile long*)&x)+1; + } +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + // this is in GCC >= 4.1 + AtomicUInt AtomicUInt::operator++(){ + return __sync_add_and_fetch(&x, 1); + } + AtomicUInt AtomicUInt::operator++(int){ + return __sync_fetch_and_add(&x, 1); + } + AtomicUInt AtomicUInt::operator--(){ + return __sync_add_and_fetch(&x, -1); + } + AtomicUInt AtomicUInt::operator--(int){ + return __sync_fetch_and_add(&x, -1); + } +#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__)) + // from boost 1.39 interprocess/detail/atomic.hpp + + inline unsigned atomic_int_helper(volatile unsigned *x, int val){ + int r; + asm volatile + ( + "lock\n\t" + "xadd %1, %0": + "+m"( *x ), "=r"( r ): // outputs (%0, %1) + "1"( val ): // inputs (%2 == %1) + "memory", "cc" // clobbers + ); + return r; + } + AtomicUInt AtomicUInt::operator++(){ + return atomic_int_helper(&x, 1)+1; + } + AtomicUInt AtomicUInt::operator++(int){ + return atomic_int_helper(&x, 1); + } + AtomicUInt AtomicUInt::operator--(){ + return atomic_int_helper(&x, -1)-1; + } + AtomicUInt AtomicUInt::operator--(int){ + return atomic_int_helper(&x, -1); + } +#else +# error "unsupported compiler or platform" +#endif + +} // namespace mongo diff --git a/util/background.cpp b/util/background.cpp index ac3a48c..4125315 100644 --- a/util/background.cpp +++ b/util/background.cpp @@ -22,7 +22,7 @@ namespace mongo { BackgroundJob *BackgroundJob::grab = 0; - boost::mutex &BackgroundJob::mutex = *( new boost::mutex ); + mongo::mutex BackgroundJob::mutex; /* static */ void BackgroundJob::thr() { @@ -38,7 +38,7 @@ namespace mongo { } BackgroundJob& BackgroundJob::go() { - boostlock bl(mutex); + scoped_lock bl(mutex); assert( grab == 0 ); grab = this; boost::thread t(thr); diff --git a/util/background.h b/util/background.h index ff044cb..c95a5bd 100644 --- a/util/background.h +++ b/util/background.h @@ -27,7 +27,6 @@ namespace mongo { has finished. Thus one pattern of use is to embed a backgroundjob in your object and reuse it (or same thing with inheritance). */ - class BackgroundJob { protected: /* define this to do your work! */ @@ -65,7 +64,7 @@ namespace mongo { private: static BackgroundJob *grab; - static boost::mutex &mutex; + static mongo::mutex mutex; static void thr(); volatile State state; }; diff --git a/util/base64.cpp b/util/base64.cpp index cf2f485..8d9d544 100644 --- a/util/base64.cpp +++ b/util/base64.cpp @@ -17,48 +17,13 @@ */ #include "stdafx.h" +#include "base64.h" namespace mongo { namespace base64 { - class Alphabet { - public: - Alphabet(){ - encode = (unsigned char*) - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz" - "0123456789" - "+/"; - - decode = (unsigned char*)malloc(257); - memset( decode , 0 , 256 ); - for ( int i=0; i<64; i++ ){ - decode[ encode[i] ] = i; - } - - test(); - } - ~Alphabet(){ - free( decode ); - } + Alphabet alphabet; - void test(){ - assert( strlen( (char*)encode ) == 64 ); - for ( int i=0; i<26; i++ ) - assert( encode[i] == toupper( encode[i+26] ) ); - } - - char e( int x ){ - return encode[x&0x3f]; - } - - private: - const unsigned char * encode; - public: - unsigned char * decode; - } alphabet; - - void encode( stringstream& ss , const char * data , int size ){ for ( int i=0; i<size; i+=3 ){ int left = size - i; diff --git a/util/base64.h b/util/base64.h index 62caceb..c113eed 100644 --- a/util/base64.h +++ b/util/base64.h @@ -15,10 +15,47 @@ * limitations under the License. */ +#pragma once namespace mongo { namespace base64 { + class Alphabet { + public: + Alphabet() + : encode((unsigned char*) + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789" + "+/") + , decode(new unsigned char[257]) + { + memset( decode.get() , 0 , 256 ); + for ( int i=0; i<64; i++ ){ + decode[ encode[i] ] = i; + } + + test(); + } + void test(){ + assert( strlen( (char*)encode ) == 64 ); + for ( int i=0; i<26; i++ ) + assert( encode[i] == toupper( encode[i+26] ) ); + } + + char e( int x ){ + return encode[x&0x3f]; + } + + private: + const unsigned char * encode; + public: + boost::scoped_array<unsigned char> decode; + }; + + extern Alphabet alphabet; + + void encode( stringstream& ss , const char * data , int size ); string encode( const char * data , int size ); string encode( const string& s ); diff --git a/util/builder.h b/util/builder.h index 5046b72..f9d3514 100644 --- a/util/builder.h +++ b/util/builder.h @@ -90,7 +90,7 @@ namespace mongo { append<double>(j); } - void append(const void *src, int len) { + void append(const void *src, size_t len) { memcpy(grow(len), src, len); } @@ -102,6 +102,10 @@ namespace mongo { append( (void *)str.c_str(), str.length() + 1 ); } + void append( int val , int padding ){ + + } + int len() const { return l; } @@ -197,7 +201,7 @@ namespace mongo { } string str(){ - return string(_buf.data,0,_buf.l); + return string(_buf.data, _buf.l); } private: diff --git a/util/debug_util.cpp b/util/debug_util.cpp index 283053f..9c2f5dc 100644 --- a/util/debug_util.cpp +++ b/util/debug_util.cpp @@ -21,7 +21,7 @@ namespace mongo { -#if defined(_DEBUG) && !defined(_WIN32) +#if defined(USE_GDBSERVER) /* Magic gdb trampoline * Do not call directly! call setupSIGTRAPforGDB() * Assumptions: diff --git a/util/file_allocator.h b/util/file_allocator.h index 73159d3..93b2b1c 100644 --- a/util/file_allocator.h +++ b/util/file_allocator.h @@ -54,7 +54,7 @@ namespace mongo { on windows anyway as we don't have to pre-zero the file there. */ #if !defined(_WIN32) - boostlock lk( pendingMutex_ ); + scoped_lock lk( pendingMutex_ ); if ( failed_ ) return; long oldSize = prevSize( name ); @@ -71,7 +71,7 @@ namespace mongo { // updated to match existing file size. void allocateAsap( const string &name, long &size ) { #if !defined(_WIN32) - boostlock lk( pendingMutex_ ); + scoped_lock lk( pendingMutex_ ); long oldSize = prevSize( name ); if ( oldSize != -1 ) { size = oldSize; @@ -91,7 +91,7 @@ namespace mongo { pendingUpdated_.notify_all(); while( inProgress( name ) ) { checkFailure(); - pendingUpdated_.wait( lk ); + pendingUpdated_.wait( lk.boost() ); } #endif } @@ -100,9 +100,9 @@ namespace mongo { #if !defined(_WIN32) if ( failed_ ) return; - boostlock lk( pendingMutex_ ); + scoped_lock lk( pendingMutex_ ); while( pending_.size() != 0 ) - pendingUpdated_.wait( lk ); + pendingUpdated_.wait( lk.boost() ); #endif } @@ -130,7 +130,7 @@ namespace mongo { return false; } - mutable boost::mutex pendingMutex_; + mutable mongo::mutex pendingMutex_; mutable boost::condition pendingUpdated_; list< string > pending_; mutable map< string, long > pendingSize_; @@ -142,21 +142,22 @@ namespace mongo { void operator()() { while( 1 ) { { - boostlock lk( a_.pendingMutex_ ); + scoped_lock lk( a_.pendingMutex_ ); if ( a_.pending_.size() == 0 ) - a_.pendingUpdated_.wait( lk ); + a_.pendingUpdated_.wait( lk.boost() ); } while( 1 ) { string name; long size; { - boostlock lk( a_.pendingMutex_ ); + scoped_lock lk( a_.pendingMutex_ ); if ( a_.pending_.size() == 0 ) break; name = a_.pending_.front(); size = a_.pendingSize_[ name ]; } try { + log() << "allocating new datafile " << name << ", filling with zeroes..." << endl; long fd = open(name.c_str(), O_CREAT | O_RDWR | O_NOATIME, S_IRUSR | S_IWUSR); if ( fd <= 0 ) { stringstream ss; @@ -180,19 +181,19 @@ namespace mongo { massert( 10442 , "Unable to allocate file of desired size", 1 == write(fd, "", 1) ); lseek(fd, 0, SEEK_SET); - log() << "allocating new datafile " << name << ", filling with zeroes..." << endl; Timer t; long z = 256 * 1024; char buf[z]; memset(buf, 0, z); long left = size; - while ( 1 ) { - if ( left <= z ) { - massert( 10443 , "write failed", left == write(fd, buf, left) ); - break; - } - massert( 10444 , "write failed", z == write(fd, buf, z) ); - left -= z; + while ( left > 0 ) { + long towrite = left; + if ( towrite > z ) + towrite = z; + + int written = write( fd , buf , towrite ); + massert( 10443 , errnostring("write failed" ), written > 0 ); + left -= written; } log() << "done allocating datafile " << name << ", size: " << size/1024/1024 << "MB, took " << ((double)t.millis())/1000.0 << " secs" << endl; } @@ -205,7 +206,7 @@ namespace mongo { BOOST_CHECK_EXCEPTION( boost::filesystem::remove( name ) ); } catch ( ... ) { } - boostlock lk( a_.pendingMutex_ ); + scoped_lock lk( a_.pendingMutex_ ); a_.failed_ = true; // not erasing from pending a_.pendingUpdated_.notify_all(); @@ -213,7 +214,7 @@ namespace mongo { } { - boostlock lk( a_.pendingMutex_ ); + scoped_lock lk( a_.pendingMutex_ ); a_.pendingSize_.erase( name ); a_.pending_.pop_front(); a_.pendingUpdated_.notify_all(); diff --git a/util/goodies.h b/util/goodies.h index 7eebc0a..4641941 100644 --- a/util/goodies.h +++ b/util/goodies.h @@ -24,7 +24,7 @@ namespace mongo { -#if !defined(_WIN32) && !defined(NOEXECINFO) +#if !defined(_WIN32) && !defined(NOEXECINFO) && !defined(__freebsd__) && !defined(__sun__) } // namespace mongo @@ -120,36 +120,11 @@ namespace mongo { x = 0; } WrappingInt(unsigned z) : x(z) { } - volatile unsigned x; + unsigned x; operator unsigned() const { return x; } - // returns original value (like x++) - WrappingInt atomicIncrement(){ -#if defined(_WIN32) - // InterlockedIncrement returns the new value - return InterlockedIncrement((volatile long*)&x)-1; //long is 32bits in Win64 -#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - // this is in GCC >= 4.1 - return __sync_fetch_and_add(&x, 1); -#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__)) - // from boost 1.39 interprocess/detail/atomic.hpp - int r; - int val = 1; - asm volatile - ( - "lock\n\t" - "xadd %1, %0": - "+m"( x ), "=r"( r ): // outputs (%0, %1) - "1"( val ): // inputs (%2 == %1) - "memory", "cc" // clobbers - ); - return r; -#else -# error "unsupported compiler or platform" -#endif - } static int diff(unsigned a, unsigned b) { return a-b; @@ -179,6 +154,23 @@ namespace mongo { buf[24] = 0; // don't want the \n } + + inline void time_t_to_Struct(time_t t, struct tm * buf , bool local = false ) { +#if defined(_WIN32) + if ( local ) + localtime_s( buf , &t ); + else + gmtime_s(buf, &t); +#else + if ( local ) + localtime_r(&t, buf); + else + gmtime_r(&t, buf); +#endif + } + + + #define asctime _asctime_not_threadsafe_ #define gmtime _gmtime_not_threadsafe_ #define localtime _localtime_not_threadsafe_ @@ -278,8 +270,42 @@ namespace mongo { return secs*1000000 + t; } using namespace boost; - typedef boost::mutex::scoped_lock boostlock; - typedef boost::recursive_mutex::scoped_lock recursive_boostlock; + + extern bool __destroyingStatics; + + // If you create a local static instance of this class, that instance will be destroyed + // before all global static objects are destroyed, so __destroyingStatics will be set + // to true before the global static variables are destroyed. + class StaticObserver : boost::noncopyable { + public: + ~StaticObserver() { __destroyingStatics = true; } + }; + + // On pthread systems, it is an error to destroy a mutex while held. Static global + // mutexes may be held upon shutdown in our implementation, and this way we avoid + // destroying them. + class mutex : boost::noncopyable { + public: + mutex() { new (_buf) boost::mutex(); } + ~mutex() { + if( !__destroyingStatics ) { + boost().boost::mutex::~mutex(); + } + } + class scoped_lock : boost::noncopyable { + public: + scoped_lock( mongo::mutex &m ) : _l( m.boost() ) {} + boost::mutex::scoped_lock &boost() { return _l; } + private: + boost::mutex::scoped_lock _l; + }; + private: + boost::mutex &boost() { return *( boost::mutex * )( _buf ); } + char _buf[ sizeof( boost::mutex ) ]; + }; + + typedef mongo::mutex::scoped_lock scoped_lock; + typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; // simple scoped timer class Timer { @@ -318,7 +344,7 @@ namespace mongo { class DebugMutex : boost::noncopyable { friend class lock; - boost::mutex m; + mongo::mutex m; int locked; public: DebugMutex() : locked(0); { } @@ -327,17 +353,17 @@ namespace mongo { */ -//typedef boostlock lock; +//typedef scoped_lock lock; inline bool startsWith(const char *str, const char *prefix) { - unsigned l = strlen(prefix); + size_t l = strlen(prefix); if ( strlen(str) < l ) return false; return strncmp(str, prefix, l) == 0; } inline bool endsWith(const char *p, const char *suffix) { - int a = strlen(p); - int b = strlen(suffix); + size_t a = strlen(p); + size_t b = strlen(suffix); if ( b > a ) return false; return strcmp(p + a - b, suffix) == 0; } @@ -418,12 +444,39 @@ namespace mongo { class ProgressMeter { public: - ProgressMeter( long long total , int secondsBetween = 3 , int checkInterval = 100 ) - : _total( total ) , _secondsBetween( secondsBetween ) , _checkInterval( checkInterval ) , - _done(0) , _hits(0) , _lastTime( (int) time(0) ){ + ProgressMeter( long long total , int secondsBetween = 3 , int checkInterval = 100 ){ + reset( total , secondsBetween , checkInterval ); + } + + ProgressMeter(){ + _active = 0; + } + + void reset( long long total , int secondsBetween = 3 , int checkInterval = 100 ){ + _total = total; + _secondsBetween = secondsBetween; + _checkInterval = checkInterval; + + _done = 0; + _hits = 0; + _lastTime = (int)time(0); + + _active = 1; + } + + void finished(){ + _active = 0; + } + + bool isActive(){ + return _active; } bool hit( int n = 1 ){ + if ( ! _active ){ + cout << "warning: hit on in-active ProgressMeter" << endl; + } + _done += n; _hits++; if ( _hits % _checkInterval ) @@ -449,7 +502,16 @@ namespace mongo { return _hits; } + string toString() const { + if ( ! _active ) + return ""; + stringstream buf; + buf << _done << "/" << _total << " " << (_done*100)/_total << "%"; + return buf.str(); + } private: + + bool _active; long long _total; int _secondsBetween; @@ -468,7 +530,7 @@ namespace mongo { } bool tryAcquire(){ - boostlock lk( _mutex ); + scoped_lock lk( _mutex ); if ( _num <= 0 ){ if ( _num < 0 ){ cerr << "DISASTER! in TicketHolder" << endl; @@ -480,12 +542,12 @@ namespace mongo { } void release(){ - boostlock lk( _mutex ); + scoped_lock lk( _mutex ); _num++; } void resize( int newSize ){ - boostlock lk( _mutex ); + scoped_lock lk( _mutex ); int used = _outof - _num; if ( used > newSize ){ cout << "ERROR: can't resize since we're using (" << used << ") more than newSize(" << newSize << ")" << endl; @@ -507,7 +569,7 @@ namespace mongo { private: int _outof; int _num; - boost::mutex _mutex; + mongo::mutex _mutex; }; class TicketHolderReleaser { @@ -523,4 +585,108 @@ namespace mongo { TicketHolder * _holder; }; + + /** + * this is a thread safe string + * you will never get a bad pointer, though data may be mungedd + */ + class ThreadSafeString { + public: + ThreadSafeString( size_t size=256 ) + : _size( 256 ) , _buf( new char[256] ){ + memset( _buf , 0 , _size ); + } + + ThreadSafeString( const ThreadSafeString& other ) + : _size( other._size ) , _buf( new char[_size] ){ + strncpy( _buf , other._buf , _size ); + } + + ~ThreadSafeString(){ + delete[] _buf; + _buf = 0; + } + + operator string() const { + string s = _buf; + return s; + } + + ThreadSafeString& operator=( const char * str ){ + size_t s = strlen(str); + if ( s >= _size - 2 ) + s = _size - 2; + strncpy( _buf , str , s ); + _buf[s] = 0; + return *this; + } + + bool operator==( const ThreadSafeString& other ) const { + return strcmp( _buf , other._buf ) == 0; + } + + bool operator==( const char * str ) const { + return strcmp( _buf , str ) == 0; + } + + bool operator!=( const char * str ) const { + return strcmp( _buf , str ); + } + + bool empty() const { + return _buf[0] == 0; + } + + private: + size_t _size; + char * _buf; + }; + + ostream& operator<<( ostream &s, const ThreadSafeString &o ); + + inline bool isNumber( char c ) { + return c >= '0' && c <= '9'; + } + + // for convenience, '{' is greater than anything and stops number parsing + inline int lexNumCmp( const char *s1, const char *s2 ) { + int nret = 0; + while( *s1 && *s2 ) { + bool p1 = ( *s1 == '{' ); + bool p2 = ( *s2 == '{' ); + if ( p1 && !p2 ) + return 1; + if ( p2 && !p1 ) + return -1; + bool n1 = isNumber( *s1 ); + bool n2 = isNumber( *s2 ); + if ( n1 && n2 ) { + if ( nret == 0 ) { + nret = *s1 > *s2 ? 1 : ( *s1 == *s2 ? 0 : -1 ); + } + } else if ( n1 ) { + return 1; + } else if ( n2 ) { + return -1; + } else { + if ( nret ) { + return nret; + } + if ( *s1 > *s2 ) { + return 1; + } else if ( *s2 > *s1 ) { + return -1; + } + nret = 0; + } + ++s1; ++s2; + } + if ( *s1 ) { + return 1; + } else if ( *s2 ) { + return -1; + } + return nret; + } + } // namespace mongo diff --git a/util/hashtab.h b/util/hashtab.h index 214c0ae..d46591c 100644 --- a/util/hashtab.h +++ b/util/hashtab.h @@ -149,7 +149,7 @@ namespace mongo { typedef void (*IteratorCallback)( const Key& k , Type& v ); - void iterall( IteratorCallback callback ){ + void iterAll( IteratorCallback callback ){ for ( int i=0; i<n; i++ ){ if ( ! nodes[i].inUse() ) continue; diff --git a/util/hex.h b/util/hex.h new file mode 100644 index 0000000..cef3e80 --- /dev/null +++ b/util/hex.h @@ -0,0 +1,35 @@ +// util/hex.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace mongo { + //can't use hex namespace because it conflicts with hex iostream function + inline int fromHex( char c ) { + if ( '0' <= c && c <= '9' ) + return c - '0'; + if ( 'a' <= c && c <= 'f' ) + return c - 'a' + 10; + if ( 'A' <= c && c <= 'F' ) + return c - 'A' + 10; + assert( false ); + return 0xff; + } + inline char fromHex( const char *c ) { + return ( fromHex( c[ 0 ] ) << 4 ) | fromHex( c[ 1 ] ); + } +} diff --git a/util/httpclient.cpp b/util/httpclient.cpp index 284bb63..08b6220 100644 --- a/util/httpclient.cpp +++ b/util/httpclient.cpp @@ -17,10 +17,25 @@ #include "stdafx.h" #include "httpclient.h" +#include "sock.h" +#include "message.h" +#include "builder.h" namespace mongo { - int HttpClient::get( string url , map<string,string>& headers, stringstream& data ){ + //#define HD(x) cout << x << endl; +#define HD(x) + + + int HttpClient::get( string url , Result * result ){ + return _go( "GET" , url , 0 , result ); + } + + int HttpClient::post( string url , string data , Result * result ){ + return _go( "POST" , url , data.c_str() , result ); + } + + int HttpClient::_go( const char * command , string url , const char * body , Result * result ){ uassert( 10271 , "invalid url" , url.find( "http://" ) == 0 ); url = url.substr( 7 ); @@ -34,28 +49,84 @@ namespace mongo { path = url.substr( url.find( "/" ) ); } + + HD( "host [" << host << "]" ); + HD( "path [" << path << "]" ); + + string server = host; int port = 80; - uassert( 10272 , "non standard port not supported yet" , host.find( ":" ) == string::npos ); - cout << "host [" << host << "]" << endl; - cout << "path [" << path << "]" << endl; - cout << "port: " << port << endl; + string::size_type idx = host.find( ":" ); + if ( idx != string::npos ){ + server = host.substr( 0 , idx ); + string t = host.substr( idx + 1 ); + port = atoi( t.c_str() ); + } + + HD( "server [" << server << "]" ); + HD( "port [" << port << "]" ); string req; { stringstream ss; - ss << "GET " << path << " HTTP/1.1\r\n"; + ss << command << " " << path << " HTTP/1.1\r\n"; ss << "Host: " << host << "\r\n"; ss << "Connection: Close\r\n"; ss << "User-Agent: mongodb http client\r\n"; + if ( body ) { + ss << "Content-Length: " << strlen( body ) << "\r\n"; + } ss << "\r\n"; + if ( body ) { + ss << body; + } req = ss.str(); } + + SockAddr addr( server.c_str() , port ); + HD( "addr: " << addr.toString() ); + + MessagingPort p; + if ( ! p.connect( addr ) ) + return -1; + + { + const char * out = req.c_str(); + int toSend = req.size(); + while ( toSend ){ + int did = p.send( out , toSend ); + toSend -= did; + out += did; + } + } + + char buf[4096]; + int got = p.recv( buf , 4096 ); + buf[got] = 0; + + int rc; + char version[32]; + assert( sscanf( buf , "%s %d" , version , &rc ) == 2 ); + HD( "rc: " << rc ); + + StringBuilder sb; + if ( result ) + sb << buf; + + while ( ( got = p.recv( buf , 4096 ) ) > 0){ + if ( result ) + sb << buf; + } - cout << req << endl; + if ( result ){ + result->_code = rc; + result->_entireResponse = sb.str(); + } - return -1; + return rc; } + + } diff --git a/util/httpclient.h b/util/httpclient.h index 14f0d87..ef3e147 100644 --- a/util/httpclient.h +++ b/util/httpclient.h @@ -23,7 +23,33 @@ namespace mongo { class HttpClient { public: - int get( string url , map<string,string>& headers, stringstream& data ); + + class Result { + public: + Result(){} + + const string& getEntireResponse() const { + return _entireResponse; + } + private: + int _code; + string _entireResponse; + friend class HttpClient; + }; + + /** + * @return response code + */ + int get( string url , Result * result = 0 ); + + /** + * @return response code + */ + int post( string url , string body , Result * result = 0 ); + + private: + int _go( const char * command , string url , const char * body , Result * result ); + }; } @@ -18,6 +18,7 @@ #pragma once #include <string.h> +#include <errno.h> namespace mongo { @@ -117,7 +118,7 @@ namespace mongo { #define LOGIT { ss << x; return *this; } class Logstream : public Nullstream { - static boost::mutex &mutex; + static mongo::mutex mutex; static int doneSetup; stringstream ss; public: @@ -127,7 +128,7 @@ namespace mongo { void flush() { // this ensures things are sane if ( doneSetup == 1717 ){ - boostlock lk(mutex); + scoped_lock lk(mutex); cout << ss.str(); cout.flush(); } @@ -244,4 +245,6 @@ namespace mongo { #define OUTPUT_ERRNOX(x) "errno:" << x << " " << strerror(x) #define OUTPUT_ERRNO OUTPUT_ERRNOX(errno) + string errnostring( const char * prefix = 0 ); + } // namespace mongo diff --git a/util/message.cpp b/util/message.cpp index 0fbc2d2..2c3d006 100644 --- a/util/message.cpp +++ b/util/message.cpp @@ -35,9 +35,11 @@ namespace mongo { #define mmm(x) #ifdef MSG_NOSIGNAL - const int portSendFlags = MSG_NOSIGNAL; + const int portSendFlags = MSG_NOSIGNAL; + const int portRecvFlags = MSG_NOSIGNAL; #else - const int portSendFlags = 0; + const int portSendFlags = 0; + const int portRecvFlags = 0; #endif /* listener ------------------------------------------------------------------- */ @@ -72,7 +74,7 @@ namespace mongo { void Listener::listen() { static long connNumber = 0; SockAddr from; - while ( 1 ) { + while ( ! inShutdown() ) { int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); if ( s < 0 ) { if ( errno == ECONNABORTED || errno == EBADF ) { @@ -117,7 +119,7 @@ namespace mongo { if ( _buf == _cur ) return 0; - int x = ::send( _port->sock , _buf , len() , 0 ); + int x = _port->send( _buf , len() ); _cur = _buf; return x; } @@ -136,23 +138,22 @@ namespace mongo { class Ports { set<MessagingPort*>& ports; - boost::mutex& m; + mongo::mutex m; public: // we "new" this so it is still be around when other automatic global vars // are being destructed during termination. - Ports() : ports( *(new set<MessagingPort*>()) ), - m( *(new boost::mutex()) ) { } + Ports() : ports( *(new set<MessagingPort*>()) ) {} void closeAll() { \ - boostlock bl(m); + scoped_lock bl(m); for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) (*i)->shutdown(); } void insert(MessagingPort* p) { - boostlock bl(m); + scoped_lock bl(m); ports.insert(p); } void erase(MessagingPort* p) { - boostlock bl(m); + scoped_lock bl(m); ports.erase(p); } } ports; @@ -263,7 +264,7 @@ again: char *lenbuf = (char *) &len; int lft = 4; while ( 1 ) { - int x = ::recv(sock, lenbuf, lft, 0); + int x = recv( lenbuf, lft ); if ( x == 0 ) { DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; m.reset(); @@ -286,7 +287,7 @@ again: if ( len == -1 ) { // Endian check from the database, after connecting, to see what mode server is running in. unsigned foo = 0x10203040; - int x = ::send(sock, (char *) &foo, 4, portSendFlags ); + int x = send( (char *) &foo, 4 ); if ( x <= 0 ) { log() << "MessagingPort endian send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; return false; @@ -301,7 +302,7 @@ again: stringstream ss; ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; string s = ss.str(); - ::send( sock , s.c_str(), s.size(), 0 ); + send( s.c_str(), s.size() ); return false; } log() << "bad recv() len: " << len << '\n'; @@ -321,7 +322,7 @@ again: char *p = (char *) &md->id; int left = len -4; while ( 1 ) { - int x = ::recv(sock, p, left, 0); + int x = recv( p, left ); if ( x == 0 ) { DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; m.reset(); @@ -376,6 +377,7 @@ again: } void MessagingPort::say(Message& toSend, int responseTo) { + assert( toSend.data ); mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) toSend.data->id = nextMessageId(); toSend.data->responseTo = responseTo; @@ -395,7 +397,7 @@ again: } if ( x == -100 ) - x = ::send(sock, (char*)toSend.data, toSend.data->len , portSendFlags ); + x = send( (char*)toSend.data, toSend.data->len ); if ( x <= 0 ) { log() << "MessagingPort say send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; @@ -404,6 +406,14 @@ again: } + int MessagingPort::send( const char * data , const int len ){ + return ::send( sock , data , len , portSendFlags ); + } + + int MessagingPort::recv( char * buf , int max ){ + return ::recv( sock , buf , max , portRecvFlags ); + } + void MessagingPort::piggyBack( Message& toSend , int responseTo ) { if ( toSend.data->len > 1300 ) { @@ -438,7 +448,7 @@ again: } msgstart; MSGID nextMessageId(){ - MSGID msgid = NextMsgId.atomicIncrement(); + MSGID msgid = NextMsgId++; if ( usingClientIds ){ msgid = msgid & 0xFFFF; diff --git a/util/message.h b/util/message.h index 8d6a46e..5dccaef 100644 --- a/util/message.h +++ b/util/message.h @@ -18,13 +18,14 @@ #pragma once #include "../util/sock.h" +#include "../util/atomic_int.h" namespace mongo { class Message; class MessagingPort; class PiggyBackData; - typedef WrappingInt MSGID; + typedef AtomicUInt MSGID; class Listener { public: @@ -73,6 +74,9 @@ namespace mongo { void piggyBack( Message& toSend , int responseTo = -1 ); virtual unsigned remotePort(); + + int send( const char * data , const int len ); + int recv( char * data , int max ); private: int sock; PiggyBackData * piggyBackData; @@ -99,6 +103,24 @@ namespace mongo { bool doesOpGetAResponse( int op ); + inline const char * opToString( int op ){ + switch ( op ){ + case 0: return "none"; + case opReply: return "reply"; + case dbMsg: return "msg"; + case dbUpdate: return "update"; + case dbInsert: return "insert"; + case dbQuery: return "query"; + case dbGetMore: return "getmore"; + case dbDelete: return "remove"; + case dbKillCursors: return "killcursors"; + default: + PRINT(op); + assert(0); + return ""; + } + } + struct MsgData { int len; /* len of the msg, including this field */ MSGID id; /* request/reply id's match... */ @@ -146,10 +168,14 @@ namespace mongo { ~Message() { reset(); } - + SockAddr from; MsgData *data; + int operation() const { + return data->operation(); + } + Message& operator=(Message& r) { assert( data == 0 ); data = r.data; @@ -175,9 +201,9 @@ namespace mongo { void setData(int operation, const char *msgtxt) { setData(operation, msgtxt, strlen(msgtxt)+1); } - void setData(int operation, const char *msgdata, int len) { + void setData(int operation, const char *msgdata, size_t len) { assert(data == 0); - int dataLen = len + sizeof(MsgData) - 4; + size_t dataLen = len + sizeof(MsgData) - 4; MsgData *d = (MsgData *) malloc(dataLen); memcpy(d->_data, msgdata, len); d->len = fixEndian(dataLen); diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp index 4d5fab0..7fca29a 100644 --- a/util/message_server_asio.cpp +++ b/util/message_server_asio.cpp @@ -27,23 +27,58 @@ #include "message.h" #include "message_server.h" -#include "../util/thread_pool.h" +#include "../util/mvar.h" using namespace boost; using namespace boost::asio; using namespace boost::asio::ip; -//using namespace std; namespace mongo { + class MessageServerSession; + namespace { - ThreadPool tp; + class StickyThread{ + public: + StickyThread() + : _thread(boost::ref(*this)) + {} + + ~StickyThread(){ + _mss.put(boost::shared_ptr<MessageServerSession>()); + _thread.join(); + } + + void ready(boost::shared_ptr<MessageServerSession> mss){ + _mss.put(mss); + } + + void operator() (){ + boost::shared_ptr<MessageServerSession> mss; + while((mss = _mss.take())){ // intentionally not using == + task(mss.get()); + mss.reset(); + } + } + + private: + boost::thread _thread; + inline void task(MessageServerSession* mss); // must be defined after MessageServerSession + + MVar<boost::shared_ptr<MessageServerSession> > _mss; // populated when given a task + }; + + vector<boost::shared_ptr<StickyThread> > thread_pool; + mongo::mutex tp_mutex; // this is only needed if io_service::run() is called from multiple threads } class MessageServerSession : public boost::enable_shared_from_this<MessageServerSession> , public AbstractMessagingPort { public: - MessageServerSession( MessageHandler * handler , io_service& ioservice ) : _handler( handler ) , _socket( ioservice ){ - - } + MessageServerSession( MessageHandler * handler , io_service& ioservice ) + : _handler( handler ) + , _socket( ioservice ) + , _portCache(0) + { } + ~MessageServerSession(){ cout << "disconnect from: " << _socket.remote_endpoint() << endl; } @@ -81,7 +116,20 @@ namespace mongo { } void handleReadBody( const boost::system::error_code& error ){ - tp.schedule(&MessageServerSession::process, shared_from_this()); + if (!_myThread){ + mongo::mutex::scoped_lock(tp_mutex); + if (!thread_pool.empty()){ + _myThread = thread_pool.back(); + thread_pool.pop_back(); + } + } + + if (!_myThread) // pool is empty + _myThread.reset(new StickyThread()); + + assert(_myThread); + + _myThread->ready(shared_from_this()); } void process(){ @@ -98,6 +146,13 @@ namespace mongo { } void handleWriteDone( const boost::system::error_code& error ){ + { + // return thread to pool after we have sent data to the client + mongo::mutex::scoped_lock(tp_mutex); + assert(_myThread); + thread_pool.push_back(_myThread); + _myThread.reset(); + } _cur.reset(); _reply.reset(); _startHeaderRead(); @@ -117,7 +172,9 @@ namespace mongo { virtual unsigned remotePort(){ - return _socket.remote_endpoint().port(); + if (!_portCache) + _portCache = _socket.remote_endpoint().port(); //this is expensive + return _portCache; } private: @@ -134,7 +191,15 @@ namespace mongo { MsgData _inHeader; Message _cur; Message _reply; + + unsigned _portCache; + + boost::shared_ptr<StickyThread> _myThread; }; + + void StickyThread::task(MessageServerSession* mss){ + mss->process(); + } class AsyncMessageServer : public MessageServer { @@ -152,6 +217,7 @@ namespace mongo { void run(){ cout << "AsyncMessageServer starting to listen on: " << _port << endl; + boost::thread other(boost::bind(&io_service::run, &_ioservice)); _ioservice.run(); cout << "AsyncMessageServer done listening on: " << _port << endl; } diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp index e5becc9..fa8f9e5 100644 --- a/util/message_server_port.cpp +++ b/util/message_server_port.cpp @@ -15,6 +15,8 @@ * limitations under the License. */ +#include "stdafx.h" + #ifndef USE_ASIO #include "message.h" diff --git a/util/miniwebserver.cpp b/util/miniwebserver.cpp index b492153..61619d8 100644 --- a/util/miniwebserver.cpp +++ b/util/miniwebserver.cpp @@ -17,6 +17,7 @@ #include "stdafx.h" #include "miniwebserver.h" +#include "hex.h" #include <pcrecpp.h> @@ -81,12 +82,13 @@ namespace mongo { return string( urlStart , (int)(end-urlStart) ); } - void MiniWebServer::parseParams( map<string,string> & params , string query ) { + void MiniWebServer::parseParams( BSONObj & params , string query ) { if ( query.size() == 0 ) return; - + + BSONObjBuilder b; while ( query.size() ) { - + string::size_type amp = query.find( "&" ); string cur; @@ -103,9 +105,10 @@ namespace mongo { if ( eq == string::npos ) continue; - params[cur.substr(0,eq)] = cur.substr(eq+1); + b.append( urlDecode(cur.substr(0,eq)).c_str() , urlDecode(cur.substr(eq+1) ) ); } - return; + + params = b.obj(); } string MiniWebServer::parseMethod( const char * headers ) { @@ -203,7 +206,7 @@ namespace mongo { void MiniWebServer::run() { SockAddr from; - while ( 1 ) { + while ( ! inShutdown() ) { int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); if ( s < 0 ) { if ( errno == ECONNABORTED ) { @@ -221,4 +224,20 @@ namespace mongo { } } + string MiniWebServer::urlDecode(const char* s){ + stringstream out; + while(*s){ + if (*s == '+'){ + out << ' '; + }else if (*s == '%'){ + out << fromHex(s+1); + s+=2; + }else{ + out << *s; + } + s++; + } + return out.str(); + } + } // namespace mongo diff --git a/util/miniwebserver.h b/util/miniwebserver.h index 27476d6..bdd2873 100644 --- a/util/miniwebserver.h +++ b/util/miniwebserver.h @@ -17,7 +17,9 @@ #pragma once +#include "../stdafx.h" #include "message.h" +#include "../db/jsobj.h" namespace mongo { @@ -45,9 +47,12 @@ namespace mongo { string parseURL( const char * buf ); string parseMethod( const char * headers ); string getHeader( const char * headers , string name ); - void parseParams( map<string,string> & params , string query ); + void parseParams( BSONObj & params , string query ); static const char *body( const char *buf ); + static string urlDecode(const char* s); + static string urlDecode(string s) {return urlDecode(s.c_str());} + private: void accepted(int s, const SockAddr &from); static bool fullReceive( const char *buf ); diff --git a/util/mmap.cpp b/util/mmap.cpp index f3103d0..f6bbc73 100644 --- a/util/mmap.cpp +++ b/util/mmap.cpp @@ -17,20 +17,21 @@ #include "stdafx.h" #include "mmap.h" +#include "processinfo.h" namespace mongo { set<MemoryMappedFile*> mmfiles; - boost::mutex mmmutex; + mongo::mutex mmmutex; MemoryMappedFile::~MemoryMappedFile() { close(); - boostlock lk( mmmutex ); + scoped_lock lk( mmmutex ); mmfiles.erase(this); } void MemoryMappedFile::created(){ - boostlock lk( mmmutex ); + scoped_lock lk( mmmutex ); mmfiles.insert(this); } @@ -54,7 +55,7 @@ namespace mongo { long long MemoryMappedFile::totalMappedLength(){ unsigned long long total = 0; - boostlock lk( mmmutex ); + scoped_lock lk( mmmutex ); for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) total += (*i)->length(); @@ -64,7 +65,7 @@ namespace mongo { int MemoryMappedFile::flushAll( bool sync ){ int num = 0; - boostlock lk( mmmutex ); + scoped_lock lk( mmmutex ); for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){ num++; MemoryMappedFile * mmf = *i; @@ -92,4 +93,18 @@ namespace mongo { return map( filename , i ); } + void printMemInfo( const char * where ){ + cout << "mem info: "; + if ( where ) + cout << where << " "; + ProcessInfo pi; + if ( ! pi.supported() ){ + cout << " not supported" << endl; + return; + } + + cout << "vsize: " << pi.getVirtualMemorySize() << " resident: " << pi.getResidentSize() << " mapped: " << ( MemoryMappedFile::totalMappedLength() / ( 1024 * 1024 ) ) << endl; + } + + } // namespace mongo diff --git a/util/mmap.h b/util/mmap.h index ed4ca99..947364b 100644 --- a/util/mmap.h +++ b/util/mmap.h @@ -22,6 +22,10 @@ namespace mongo { class MemoryMappedFile { public: + enum Options { + SEQUENTIAL = 1 + }; + MemoryMappedFile(); ~MemoryMappedFile(); /* closes the file if open */ void close(); @@ -32,7 +36,7 @@ namespace mongo { /* Creates with length if DNE, otherwise uses existing file length, passed length. */ - void* map(const char *filename, long &length); + void* map(const char *filename, long &length, int options = 0 ); void flush(bool sync); @@ -58,6 +62,7 @@ namespace mongo { void *view; long len; }; - + + void printMemInfo( const char * where ); } // namespace mongo diff --git a/util/mmap_mm.cpp b/util/mmap_mm.cpp index aa9b275..9cffad5 100644 --- a/util/mmap_mm.cpp +++ b/util/mmap_mm.cpp @@ -31,14 +31,13 @@ namespace mongo { void MemoryMappedFile::close() { if ( view ) - delete( view ); + free( view ); view = 0; len = 0; } - void* MemoryMappedFile::map(const char *filename, size_t length) { - path p( filename ); - + void* MemoryMappedFile::map(const char *filename, long& length , int options ) { + assert( length ); view = malloc( length ); return view; } diff --git a/util/mmap_posix.cpp b/util/mmap_posix.cpp index 1237220..836373d 100644 --- a/util/mmap_posix.cpp +++ b/util/mmap_posix.cpp @@ -49,7 +49,7 @@ namespace mongo { #define O_NOATIME 0 #endif - void* MemoryMappedFile::map(const char *filename, long &length) { + void* MemoryMappedFile::map(const char *filename, long &length, int options) { // length may be updated by callee. theFileAllocator().allocateAsap( filename, length ); len = length; @@ -79,9 +79,19 @@ namespace mongo { } return 0; } + +#if defined(__sunos__) +#warning madvise not supported on solaris yet +#else + if ( options & SEQUENTIAL ){ + if ( madvise( view , length , MADV_SEQUENTIAL ) ){ + out() << " madvise failed for " << filename << " " << OUTPUT_ERRNO << endl; + } + } +#endif return view; } - + void MemoryMappedFile::flush(bool sync) { if ( view == 0 || fd == 0 ) return; diff --git a/util/mmap_win.cpp b/util/mmap_win.cpp index 8a0d306..d831d66 100644 --- a/util/mmap_win.cpp +++ b/util/mmap_win.cpp @@ -49,7 +49,7 @@ namespace mongo { unsigned mapped = 0; - void* MemoryMappedFile::map(const char *_filename, long &length) { + void* MemoryMappedFile::map(const char *_filename, long &length, int options) { /* big hack here: Babble uses db names with colons. doesn't seem to work on windows. temporary perhaps. */ char filename[256]; strncpy(filename, _filename, 255); @@ -69,9 +69,13 @@ namespace mongo { updateLength( filename, length ); std::wstring filenamew = toWideString(filename); + DWORD createOptions = FILE_ATTRIBUTE_NORMAL; + if ( options & SEQUENTIAL ) + createOptions |= FILE_FLAG_SEQUENTIAL_SCAN; + fd = CreateFile( filenamew.c_str(), GENERIC_WRITE | GENERIC_READ, FILE_SHARE_READ, - NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); + NULL, OPEN_ALWAYS, createOptions , NULL); if ( fd == INVALID_HANDLE_VALUE ) { out() << "Create/OpenFile failed " << filename << ' ' << GetLastError() << endl; return 0; @@ -95,7 +99,21 @@ namespace mongo { return view; } - void MemoryMappedFile::flush(bool) { - } + void MemoryMappedFile::flush(bool sync) { + uassert(13056, "Async flushing not supported on windows", sync); + if (!view || !fd) return; + + bool success = FlushViewOfFile(view, 0); // 0 means whole mapping + if (!success){ + int err = GetLastError(); + out() << "FlushViewOfFile failed " << err << endl; + } + + success = FlushFileBuffers(fd); + if (!success){ + int err = GetLastError(); + out() << "FlushFileBuffers failed " << err << endl; + } + } } diff --git a/util/optime.h b/util/optime.h index b7d4f61..8b26434 100644 --- a/util/optime.h +++ b/util/optime.h @@ -20,15 +20,24 @@ #include "../db/concurrency.h" namespace mongo { + void exitCleanly( int code ); /* Operation sequence #. A combination of current second plus an ordinal value. */ + struct ClockSkewException : public DBException { + virtual const char* what() const throw() { return "clock skew exception"; } + virtual int getCode(){ return 20001; } + }; + #pragma pack(4) class OpTime { unsigned i; unsigned secs; static OpTime last; public: + static void setLast(const Date_t &date) { + last = OpTime(date); + } unsigned getSecs() const { return secs; } @@ -49,6 +58,20 @@ namespace mongo { static OpTime now() { unsigned t = (unsigned) time(0); // DEV assertInWriteLock(); + if ( t < last.secs ){ + bool toLog = false; + ONCE toLog = true; + RARELY toLog = true; + if ( last.i & 0x80000000 ) + toLog = true; + if ( toLog ) + log() << "clock skew detected prev: " << last.secs << " now: " << t << " trying to handle..." << endl; + if ( last.i & 0x80000000 ) { + log() << "ERROR Large clock skew detected, shutting down" << endl; + throw ClockSkewException(); + } + t = last.secs; + } if ( last.secs == t ) { last.i++; return last; diff --git a/util/processinfo.h b/util/processinfo.h index 83c3bcf..b7bc90d 100644 --- a/util/processinfo.h +++ b/util/processinfo.h @@ -52,6 +52,9 @@ namespace mongo { bool supported(); + bool blockCheckSupported(); + bool blockInMemory( char * start ); + private: pid_t _pid; }; diff --git a/util/processinfo_darwin.cpp b/util/processinfo_darwin.cpp index 904f967..206c270 100644 --- a/util/processinfo_darwin.cpp +++ b/util/processinfo_darwin.cpp @@ -15,8 +15,9 @@ * limitations under the License. */ +#include "../stdafx.h" #include "processinfo.h" - +#include "log.h" #include <mach/task_info.h> @@ -29,6 +30,9 @@ #include <mach/shared_memory_server.h> #include <iostream> +#include <sys/types.h> +#include <sys/mman.h> + using namespace std; namespace mongo { @@ -63,7 +67,7 @@ namespace mongo { cout << "error getting task_info: " << result << endl; return 0; } - return (int)((double)ti.virtual_size / (1024.0 * 1024 * 2 ) ); + return (int)((double)ti.virtual_size / (1024.0 * 1024 ) ); } int ProcessInfo::getResidentSize(){ @@ -92,4 +96,22 @@ namespace mongo { void ProcessInfo::getExtraInfo(BSONObjBuilder& info) {} + bool ProcessInfo::blockCheckSupported(){ + return true; + } + + bool ProcessInfo::blockInMemory( char * start ){ + static long pageSize = 0; + if ( pageSize == 0 ){ + pageSize = sysconf( _SC_PAGESIZE ); + } + start = start - ( (unsigned long long)start % pageSize ); + char x = 0; + if ( mincore( start , 128 , &x ) ){ + log() << "mincore failed: " << OUTPUT_ERRNO << endl; + return 1; + } + return x & 0x1; + } + } diff --git a/util/processinfo_linux2.cpp b/util/processinfo_linux2.cpp index 3e00c06..eaaee09 100644 --- a/util/processinfo_linux2.cpp +++ b/util/processinfo_linux2.cpp @@ -21,6 +21,8 @@ #include <stdio.h> #include <malloc.h> #include <db/jsobj.h> +#include <unistd.h> +#include <sys/mman.h> using namespace std; @@ -212,4 +214,23 @@ namespace mongo { info.append("page_faults", (int)p._maj_flt); } + bool ProcessInfo::blockCheckSupported(){ + return true; + } + + bool ProcessInfo::blockInMemory( char * start ){ + static long pageSize = 0; + if ( pageSize == 0 ){ + pageSize = sysconf( _SC_PAGESIZE ); + } + start = start - ( (unsigned long long)start % pageSize ); + unsigned char x = 0; + if ( mincore( start , 128 , &x ) ){ + log() << "mincore failed: " << OUTPUT_ERRNO << endl; + return 1; + } + return x & 0x1; + } + + } diff --git a/util/processinfo_none.cpp b/util/processinfo_none.cpp index 57f4ca3..9af1766 100644 --- a/util/processinfo_none.cpp +++ b/util/processinfo_none.cpp @@ -42,5 +42,14 @@ namespace mongo { } void ProcessInfo::getExtraInfo(BSONObjBuilder& info) {} + + bool ProcessInfo::blockCheckSupported(){ + return false; + } + + bool ProcessInfo::blockInMemory( char * start ){ + assert(0); + return true; + } } diff --git a/util/processinfo_win32.cpp b/util/processinfo_win32.cpp index 0f0bf2e..0705fcb 100644 --- a/util/processinfo_win32.cpp +++ b/util/processinfo_win32.cpp @@ -61,4 +61,14 @@ namespace mongo { } void ProcessInfo::getExtraInfo(BSONObjBuilder& info) {} + + bool ProcessInfo::blockCheckSupported(){ + return false; + } + + bool ProcessInfo::blockInMemory( char * start ){ + assert(0); + return true; + } + } diff --git a/util/queue.h b/util/queue.h index 8f4fbaf..d48e012 100644 --- a/util/queue.h +++ b/util/queue.h @@ -30,18 +30,18 @@ namespace mongo { template<typename T> class BlockingQueue : boost::noncopyable { public: void push(T const& t){ - boostlock l( _lock ); + scoped_lock l( _lock ); _queue.push( t ); _condition.notify_one(); } bool empty() const { - boostlock l( _lock ); + scoped_lock l( _lock ); return _queue.empty(); } bool tryPop( T & t ){ - boostlock l( _lock ); + scoped_lock l( _lock ); if ( _queue.empty() ) return false; @@ -53,9 +53,9 @@ namespace mongo { T blockingPop(){ - boostlock l( _lock ); + scoped_lock l( _lock ); while( _queue.empty() ) - _condition.wait( l ); + _condition.wait( l.boost() ); T t = _queue.front(); _queue.pop(); @@ -65,7 +65,7 @@ namespace mongo { private: std::queue<T> _queue; - mutable boost::mutex _lock; + mutable mongo::mutex _lock; boost::condition _condition; }; diff --git a/util/sock.cpp b/util/sock.cpp index 5172692..5beac68 100644 --- a/util/sock.cpp +++ b/util/sock.cpp @@ -20,14 +20,14 @@ namespace mongo { - static boost::mutex sock_mutex; + static mongo::mutex sock_mutex; string hostbyname(const char *hostname) { static string unknown = "0.0.0.0"; if ( unknown == hostname ) return unknown; - boostlock lk(sock_mutex); + scoped_lock lk(sock_mutex); #if defined(_WIN32) if( inet_addr(hostname) != INADDR_NONE ) return hostname; diff --git a/util/sock.h b/util/sock.h index 5798a71..ee7a7ae 100644 --- a/util/sock.h +++ b/util/sock.h @@ -245,25 +245,25 @@ namespace mongo { } void add( int sock ){ - boostlock lk( _mutex ); + scoped_lock lk( _mutex ); _sockets->insert( sock ); } void remove( int sock ){ - boostlock lk( _mutex ); + scoped_lock lk( _mutex ); _sockets->erase( sock ); } void closeAll(){ set<int>* s; { - boostlock lk( _mutex ); + scoped_lock lk( _mutex ); s = _sockets; _sockets = new set<int>(); } for ( set<int>::iterator i=s->begin(); i!=s->end(); i++ ){ int sock = *i; - log() << "going to close listening socket: " << sock << endl; + log() << "\t going to close listening socket: " << sock << endl; closesocket( sock ); } @@ -272,7 +272,7 @@ namespace mongo { static ListeningSockets* get(); private: - boost::mutex _mutex; + mongo::mutex _mutex; set<int>* _sockets; static ListeningSockets* _instance; }; diff --git a/util/thread_pool.cpp b/util/thread_pool.cpp index b95bc1d..77d0d05 100644 --- a/util/thread_pool.cpp +++ b/util/thread_pool.cpp @@ -77,7 +77,7 @@ ThreadPool::ThreadPool(int nThreads) : _tasksRemaining(0) , _nThreads(nThreads) { - boostlock lock(_mutex); + scoped_lock lock(_mutex); while (nThreads-- > 0){ Worker* worker = new Worker(*this); _freeWorkers.push_front(worker); @@ -99,14 +99,14 @@ ThreadPool::~ThreadPool(){ } void ThreadPool::join(){ - boostlock lock(_mutex); + scoped_lock lock(_mutex); while(_tasksRemaining){ - _condition.wait(lock); + _condition.wait(lock.boost()); } } void ThreadPool::schedule(Task task){ - boostlock lock(_mutex); + scoped_lock lock(_mutex); _tasksRemaining++; @@ -120,7 +120,7 @@ void ThreadPool::schedule(Task task){ // should only be called by a worker from the worker thread void ThreadPool::task_done(Worker* worker){ - boostlock lock(_mutex); + scoped_lock lock(_mutex); if (!_tasks.empty()){ worker->set_task(_tasks.front()); diff --git a/util/thread_pool.h b/util/thread_pool.h index 91c2969..d891d7d 100644 --- a/util/thread_pool.h +++ b/util/thread_pool.h @@ -62,7 +62,7 @@ namespace threadpool { int tasks_remaining() { return _tasksRemaining; } private: - boost::mutex _mutex; + mongo::mutex _mutex; boost::condition _condition; list<Worker*> _freeWorkers; //used as LIFO stack (always front) diff --git a/util/top.cpp b/util/top.cpp deleted file mode 100644 index 98d9598..0000000 --- a/util/top.cpp +++ /dev/null @@ -1,18 +0,0 @@ -// top.cpp - -#include "stdafx.h" -#include "top.h" - -namespace mongo { - - Top::T Top::_snapshotStart = Top::currentTime(); - Top::D Top::_snapshotDuration; - Top::UsageMap Top::_totalUsage; - Top::UsageMap Top::_snapshotA; - Top::UsageMap Top::_snapshotB; - Top::UsageMap &Top::_snapshot = Top::_snapshotA; - Top::UsageMap &Top::_nextSnapshot = Top::_snapshotB; - boost::mutex Top::topMutex; - - -} diff --git a/util/top.h b/util/top.h deleted file mode 100644 index aaf7c3f..0000000 --- a/util/top.h +++ /dev/null @@ -1,183 +0,0 @@ -// top.h : DB usage monitor. - -/* Copyright 2009 10gen Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <boost/date_time/posix_time/posix_time.hpp> -#undef assert -#define assert xassert - -namespace mongo { - - /* Records per namespace utilization of the mongod process. - No two functions of this class may be called concurrently. - */ - class Top { - typedef boost::posix_time::ptime T; - typedef boost::posix_time::time_duration D; - typedef boost::tuple< D, int, int, int > UsageData; - public: - Top() : _read(false), _write(false) { } - - /* these are used to record activity: */ - - void clientStart( const char *client ) { - clientStop(); - _currentStart = currentTime(); - _current = client; - } - - /* indicate current request is a read operation. */ - void setRead() { _read = true; } - - void setWrite() { _write = true; } - - void clientStop() { - if ( _currentStart == T() ) - return; - D d = currentTime() - _currentStart; - - { - boostlock L(topMutex); - recordUsage( _current, d ); - } - - _currentStart = T(); - _read = false; - _write = false; - } - - /* these are used to fetch the stats: */ - - struct Usage { - string ns; - D time; - double pct; - int reads, writes, calls; - }; - - static void usage( vector< Usage > &res ) { - boostlock L(topMutex); - - // Populate parent namespaces - UsageMap snapshot; - UsageMap totalUsage; - fillParentNamespaces( snapshot, _snapshot ); - fillParentNamespaces( totalUsage, _totalUsage ); - - multimap< D, string, more > sorted; - for( UsageMap::iterator i = snapshot.begin(); i != snapshot.end(); ++i ) - sorted.insert( make_pair( i->second.get<0>(), i->first ) ); - for( multimap< D, string, more >::iterator i = sorted.begin(); i != sorted.end(); ++i ) { - if ( trivialNs( i->second.c_str() ) ) - continue; - Usage u; - u.ns = i->second; - u.time = totalUsage[ u.ns ].get<0>(); - u.pct = _snapshotDuration != D() ? 100.0 * i->first.ticks() / _snapshotDuration.ticks() : 0; - u.reads = snapshot[ u.ns ].get<1>(); - u.writes = snapshot[ u.ns ].get<2>(); - u.calls = snapshot[ u.ns ].get<3>(); - res.push_back( u ); - } - for( UsageMap::iterator i = totalUsage.begin(); i != totalUsage.end(); ++i ) { - if ( snapshot.count( i->first ) != 0 || trivialNs( i->first.c_str() ) ) - continue; - Usage u; - u.ns = i->first; - u.time = i->second.get<0>(); - u.pct = 0; - u.reads = 0; - u.writes = 0; - u.calls = 0; - res.push_back( u ); - } - } - - static void completeSnapshot() { - boostlock L(topMutex); - - if ( &_snapshot == &_snapshotA ) { - _snapshot = _snapshotB; - _nextSnapshot = _snapshotA; - } else { - _snapshot = _snapshotA; - _nextSnapshot = _snapshotB; - } - _snapshotDuration = currentTime() - _snapshotStart; - _snapshotStart = currentTime(); - _nextSnapshot.clear(); - } - - private: - static boost::mutex topMutex; - static bool trivialNs( const char *ns ) { - const char *ret = strrchr( ns, '.' ); - return ret && ret[ 1 ] == '\0'; - } - typedef map<string,UsageData> UsageMap; // duration, # reads, # writes, # total calls - static T currentTime() { - return boost::posix_time::microsec_clock::universal_time(); - } - void recordUsage( const string &client, D duration ) { - recordUsageForMap( _totalUsage, client, duration ); - recordUsageForMap( _nextSnapshot, client, duration ); - } - void recordUsageForMap( UsageMap &map, const string &client, D duration ) { - UsageData& g = map[client]; - g.get< 0 >() += duration; - if ( _read && !_write ) - g.get< 1 >()++; - else if ( !_read && _write ) - g.get< 2 >()++; - g.get< 3 >()++; - } - static void fillParentNamespaces( UsageMap &to, const UsageMap &from ) { - for( UsageMap::const_iterator i = from.begin(); i != from.end(); ++i ) { - string current = i->first; - size_t dot = current.rfind( "." ); - if ( dot == string::npos || dot != current.length() - 1 ) { - inc( to[ current ], i->second ); - } - while( dot != string::npos ) { - current = current.substr( 0, dot ); - inc( to[ current ], i->second ); - dot = current.rfind( "." ); - } - } - } - static void inc( UsageData &to, const UsageData &from ) { - to.get<0>() += from.get<0>(); - to.get<1>() += from.get<1>(); - to.get<2>() += from.get<2>(); - to.get<3>() += from.get<3>(); - } - struct more { bool operator()( const D &a, const D &b ) { return a > b; } }; - string _current; - T _currentStart; - static T _snapshotStart; - static D _snapshotDuration; - static UsageMap _totalUsage; - static UsageMap _snapshotA; - static UsageMap _snapshotB; - static UsageMap &_snapshot; - static UsageMap &_nextSnapshot; - bool _read; - bool _write; - }; - -} // namespace mongo diff --git a/util/util.cpp b/util/util.cpp index 78d8d52..8ae00f3 100644 --- a/util/util.cpp +++ b/util/util.cpp @@ -18,7 +18,6 @@ #include "stdafx.h" #include "goodies.h" #include "unittest.h" -#include "top.h" #include "file_allocator.h" #include "optime.h" @@ -35,7 +34,7 @@ namespace mongo { const char * (*getcurns)() = default_getcurns; int logLevel = 0; - boost::mutex &Logstream::mutex = *( new boost::mutex ); + mongo::mutex Logstream::mutex; int Logstream::doneSetup = Logstream::magicNumber(); bool goingAway = false; @@ -113,9 +112,9 @@ namespace mongo { #if defined(_WIN32) (std::cout << now << " " << s).flush(); #else - assert( write( STDOUT_FILENO, now, 20 ) > 0 ); - assert( write( STDOUT_FILENO, " ", 1 ) > 0 ); - assert( write( STDOUT_FILENO, s.c_str(), s.length() ) > 0 ); + write( STDOUT_FILENO, now, 20 ); + write( STDOUT_FILENO, " ", 1 ); + write( STDOUT_FILENO, s.c_str(), s.length() ); fsync( STDOUT_FILENO ); #endif } @@ -133,5 +132,12 @@ namespace mongo { ss << "db version v" << versionString << ", pdfile version " << VERSION << "." << VERSION_MINOR; return ss.str(); } - + + ostream& operator<<( ostream &s, const ThreadSafeString &o ){ + s << (string)o; + return s; + } + + bool __destroyingStatics = false; + } // namespace mongo |