diff options
Diffstat (limited to 'util')
72 files changed, 4114 insertions, 2229 deletions
diff --git a/util/alignedbuilder.cpp b/util/alignedbuilder.cpp index 1734431..b2e0461 100644 --- a/util/alignedbuilder.cpp +++ b/util/alignedbuilder.cpp @@ -29,6 +29,35 @@ namespace mongo { BOOST_STATIC_ASSERT(sizeof(void*) == sizeof(size_t)); + /** reset for a re-use. shrinks if > 128MB */ + void AlignedBuilder::reset() { + _len = 0; + RARELY { + const unsigned sizeCap = 128*1024*1024; + if (_p._size > sizeCap) + _realloc(sizeCap, _len); + } + } + + /** reset with a hint as to the upcoming needed size specified */ + void AlignedBuilder::reset(unsigned sz) { + _len = 0; + unsigned Q = 32 * 1024 * 1024 - 1; + unsigned want = (sz+Q) & (~Q); + if( _p._size == want ) { + return; + } + if( _p._size > want ) { + if( _p._size <= 64 * 1024 * 1024 ) + return; + bool downsize = false; + RARELY { downsize = true; } + if( !downsize ) + return; + } + _realloc(want, _len); + } + void AlignedBuilder::mallocSelfAligned(unsigned sz) { assert( sz == _p._size ); void *p = malloc(sz + Alignment - 1); @@ -44,10 +73,20 @@ namespace mongo { /* "slow"/infrequent portion of 'grow()' */ void NOINLINE_DECL AlignedBuilder::growReallocate(unsigned oldLen) { + dassert( _len > _p._size ); unsigned a = _p._size; assert( a ); while( 1 ) { - a *= 2; + if( a < 128 * 1024 * 1024 ) + a *= 2; + else if( sizeof(int*) == 4 ) + a += 32 * 1024 * 1024; + else + a += 64 * 1024 * 1024; + DEV if( a > 256*1024*1024 ) { + log() << "dur AlignedBuilder too big, aborting in _DEBUG build" << endl; + abort(); + } wassert( a <= 256*1024*1024 ); assert( a <= 512*1024*1024 ); if( _len < a ) diff --git a/util/alignedbuilder.h b/util/alignedbuilder.h index 452cec2..1d246a9 100644 --- a/util/alignedbuilder.h +++ b/util/alignedbuilder.h @@ -28,13 +28,11 @@ namespace mongo { AlignedBuilder(unsigned init_size); ~AlignedBuilder() { kill(); } + /** reset with a hint as to the upcoming needed size specified */ + void reset(unsigned sz); + /** reset for a re-use. shrinks if > 128MB */ - void reset() { - _len = 0; - const unsigned sizeCap = 128*1024*1024; - if (_p._size > sizeCap) - _realloc(sizeCap, _len); - } + void reset(); /** note this may be deallocated (realloced) if you keep writing or reset(). */ const char* buf() const { return _p._data; } @@ -48,8 +46,12 @@ namespace mongo { return l; } + /** if buffer grows pointer no longer valid */ char* atOfs(unsigned ofs) { return _p._data + ofs; } + /** if buffer grows pointer no longer valid */ + char* cur() { return _p._data + _len; } + void appendChar(char j) { *((char*)grow(sizeof(char))) = j; } @@ -99,7 +101,7 @@ namespace mongo { inline char* grow(unsigned by) { unsigned oldlen = _len; _len += by; - if ( _len > _p._size ) { + if (MONGO_unlikely( _len > _p._size )) { growReallocate(oldlen); } return _p._data + oldlen; diff --git a/util/array.h b/util/array.h index bf705a4..1282225 100644 --- a/util/array.h +++ b/util/array.h @@ -18,6 +18,12 @@ namespace mongo { + /* + * simple array class that does no allocations + * same api as vector + * fixed buffer, so once capacity is exceeded, will assert + * meant to be-reused with clear() + */ template<typename T> class FastArray { public: @@ -44,6 +50,7 @@ namespace mongo { } void push_back( const T& t ) { + assert( _size < _capacity ); _data[_size++] = t; } diff --git a/util/assert_util.cpp b/util/assert_util.cpp index 8280d8b..da039c0 100644 --- a/util/assert_util.cpp +++ b/util/assert_util.cpp @@ -62,18 +62,34 @@ namespace mongo { b.append( c , code ); } - string getDbContext(); /* "warning" assert -- safe to continue, so we don't throw exception. */ - void wasserted(const char *msg, const char *file, unsigned line) { - problem() << "Assertion failure " << msg << ' ' << file << ' ' << dec << line << endl; + NOINLINE_DECL void wasserted(const char *msg, const char *file, unsigned line) { + static bool rateLimited; + static time_t lastWhen; + static unsigned lastLine; + if( lastLine == line && time(0)-lastWhen < 5 ) { + if( rateLimited++ == 0 ) { + log() << "rate limiting wassert" << endl; + } + return; + } + lastWhen = time(0); + lastLine = line; + + problem() << "warning assertion failure " << msg << ' ' << file << ' ' << dec << line << endl; sayDbContext(); raiseError(0,msg && *msg ? msg : "wassertion failure"); assertionCount.condrollover( ++assertionCount.warning ); +#if defined(_DEBUG) || defined(_DURABLEDEFAULTON) || defined(_DURABLEDEFAULTOFF) + // this is so we notice in buildbot + log() << "\n\n***aborting after wassert() failure in a debug/test build\n\n" << endl; + abort(); +#endif } - void asserted(const char *msg, const char *file, unsigned line) { + NOINLINE_DECL void asserted(const char *msg, const char *file, unsigned line) { assertionCount.condrollover( ++assertionCount.regular ); problem() << "Assertion failure " << msg << ' ' << file << ' ' << dec << line << endl; sayDbContext(); @@ -82,6 +98,28 @@ namespace mongo { temp << "assertion " << file << ":" << line; AssertionException e(temp.str(),0); breakpoint(); +#if defined(_DEBUG) || defined(_DURABLEDEFAULTON) || defined(_DURABLEDEFAULTOFF) + // this is so we notice in buildbot + log() << "\n\n***aborting after assert() failure in a debug/test build\n\n" << endl; + abort(); +#endif + throw e; + } + + NOINLINE_DECL void verifyFailed( int msgid ) { + assertionCount.condrollover( ++assertionCount.regular ); + problem() << "Assertion failure " << msgid << endl; + sayDbContext(); + raiseError(0,"assertion failure"); + stringstream temp; + temp << msgid; + AssertionException e(temp.str(),0); + breakpoint(); +#if defined(_DEBUG) || defined(_DURABLEDEFAULTON) || defined(_DURABLEDEFAULTOFF) + // this is so we notice in buildbot + log() << "\n\n***aborting after verify() failure in a debug/test build\n\n" << endl; + abort(); +#endif throw e; } @@ -89,14 +127,14 @@ namespace mongo { raiseError(0,msg); } - void uasserted(int msgid, const char *msg) { + NOINLINE_DECL void uasserted(int msgid, const char *msg) { assertionCount.condrollover( ++assertionCount.user ); LOG(1) << "User Assertion: " << msgid << ":" << msg << endl; raiseError(msgid,msg); throw UserException(msgid, msg); } - void msgasserted(int msgid, const char *msg) { + NOINLINE_DECL void msgasserted(int msgid, const char *msg) { assertionCount.condrollover( ++assertionCount.warning ); tlog() << "Assertion: " << msgid << ":" << msg << endl; raiseError(msgid,msg && *msg ? msg : "massert failure"); @@ -105,14 +143,14 @@ namespace mongo { throw MsgAssertionException(msgid, msg); } - void msgassertedNoTrace(int msgid, const char *msg) { + NOINLINE_DECL void msgassertedNoTrace(int msgid, const char *msg) { assertionCount.condrollover( ++assertionCount.warning ); log() << "Assertion: " << msgid << ":" << msg << endl; raiseError(msgid,msg && *msg ? msg : "massert failure"); throw MsgAssertionException(msgid, msg); } - void streamNotGood( int code , string msg , std::ios& myios ) { + NOINLINE_DECL void streamNotGood( int code , string msg , std::ios& myios ) { stringstream ss; // errno might not work on all systems for streams // if it doesn't for a system should deal with here @@ -144,5 +182,22 @@ namespace mongo { #endif } + NOINLINE_DECL ErrorMsg::ErrorMsg(const char *msg, char ch) { + int l = strlen(msg); + assert( l < 128); + memcpy(buf, msg, l); + char *p = buf + l; + p[0] = ch; + p[1] = 0; + } + + NOINLINE_DECL ErrorMsg::ErrorMsg(const char *msg, unsigned val) { + int l = strlen(msg); + assert( l < 128); + memcpy(buf, msg, l); + char *p = buf + l; + sprintf(p, "%u", val); + } + } diff --git a/util/assert_util.h b/util/assert_util.h index 151e950..b4c68b7 100644 --- a/util/assert_util.h +++ b/util/assert_util.h @@ -20,6 +20,13 @@ #include "../db/lasterror.h" +// MONGO_NORETURN undefed at end of file +#ifdef __GNUC__ +# define MONGO_NORETURN __attribute__((__noreturn__)) +#else +# define MONGO_NORETURN +#endif + namespace mongo { enum CommonErrorCodes { @@ -53,11 +60,28 @@ namespace mongo { void append( BSONObjBuilder& b , const char * m = "$err" , const char * c = "code" ) const ; string toString() const { stringstream ss; ss << "exception: " << code << " " << msg; return ss.str(); } bool empty() const { return msg.empty(); } + + void reset(){ msg = ""; code=-1; } string msg; int code; }; + /** helper class that builds error strings. lighter weight than a StringBuilder, albeit less flexible. + NOINLINE_DECL used in the constructor implementations as we are assuming this is a cold code path when used. + + example: + throw UserException(123, ErrorMsg("blah", num_val)); + */ + class ErrorMsg { + public: + ErrorMsg(const char *msg, char ch); + ErrorMsg(const char *msg, unsigned val); + operator string() const { return buf; } + private: + char buf[256]; + }; + class DBException : public std::exception { public: DBException( const ExceptionInfo& ei ) : _ei(ei) {} @@ -117,14 +141,14 @@ namespace mongo { virtual void appendPrefix( stringstream& ss ) const { ss << "massert:"; } }; - - void asserted(const char *msg, const char *file, unsigned line); + void asserted(const char *msg, const char *file, unsigned line) MONGO_NORETURN; void wasserted(const char *msg, const char *file, unsigned line); - + void verifyFailed( int msgid ); + /** a "user assertion". throws UserAssertion. logs. typically used for errors that a user - could cause, such as dupliate key, disk full, etc. + could cause, such as duplicate key, disk full, etc. */ - void uasserted(int msgid, const char *msg); + void uasserted(int msgid, const char *msg) MONGO_NORETURN; inline void uasserted(int msgid , string msg) { uasserted(msgid, msg.c_str()); } /** reported via lasterror, but don't throw exception */ @@ -133,24 +157,33 @@ namespace mongo { /** msgassert and massert are for errors that are internal but have a well defined error text string. a stack trace is logged. */ - void msgassertedNoTrace(int msgid, const char *msg); + void msgassertedNoTrace(int msgid, const char *msg) MONGO_NORETURN; inline void msgassertedNoTrace(int msgid, const string& msg) { msgassertedNoTrace( msgid , msg.c_str() ); } - void msgasserted(int msgid, const char *msg); + void msgasserted(int msgid, const char *msg) MONGO_NORETURN; inline void msgasserted(int msgid, string msg) { msgasserted(msgid, msg.c_str()); } + /* convert various types of exceptions to strings */ + inline string causedBy( const char* e ){ return (string)" :: caused by :: " + e; } + inline string causedBy( const DBException& e ){ return causedBy( e.toString().c_str() ); } + inline string causedBy( const std::exception& e ){ return causedBy( e.what() ); } + inline string causedBy( const string& e ){ return causedBy( e.c_str() ); } + + /** in the mongodb source, use verify() instead of assert(). verify is always evaluated even in release builds. */ + inline void verify( int msgid , bool testOK ) { if ( ! testOK ) verifyFailed( msgid ); } + #ifdef assert #undef assert #endif -#define MONGO_assert(_Expression) (void)( (!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) ) +#define MONGO_assert(_Expression) (void)( MONGO_likely(!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) ) #define assert MONGO_assert /* "user assert". if asserts, user did something wrong, not our code */ -#define MONGO_uassert(msgid, msg, expr) (void)( (!!(expr)) || (mongo::uasserted(msgid, msg), 0) ) +#define MONGO_uassert(msgid, msg, expr) (void)( MONGO_likely(!!(expr)) || (mongo::uasserted(msgid, msg), 0) ) #define uassert MONGO_uassert /* warning only - keeps going */ -#define MONGO_wassert(_Expression) (void)( (!!(_Expression)) || (mongo::wasserted(#_Expression, __FILE__, __LINE__), 0) ) +#define MONGO_wassert(_Expression) (void)( MONGO_likely(!!(_Expression)) || (mongo::wasserted(#_Expression, __FILE__, __LINE__), 0) ) #define wassert MONGO_wassert /* display a message, no context, and throw assertionexception @@ -158,7 +191,7 @@ namespace mongo { easy way to throw an exception and log something without our stack trace display happening. */ -#define MONGO_massert(msgid, msg, expr) (void)( (!!(expr)) || (mongo::msgasserted(msgid, msg), 0) ) +#define MONGO_massert(msgid, msg, expr) (void)( MONGO_likely(!!(expr)) || (mongo::msgasserted(msgid, msg), 0) ) #define massert MONGO_massert /* dassert is 'debug assert' -- might want to turn off for production as these @@ -179,7 +212,7 @@ namespace mongo { enum { ASSERT_ID_DUPKEY = 11000 }; /* throws a uassertion with an appropriate msg */ - void streamNotGood( int code , string msg , std::ios& myios ); + void streamNotGood( int code , string msg , std::ios& myios ) MONGO_NORETURN; inline void assertStreamGood(unsigned msgid, string msg, std::ios& myios) { if( !myios.good() ) streamNotGood(msgid, msg, myios); @@ -195,10 +228,21 @@ namespace mongo { expression; \ } catch ( const std::exception &e ) { \ stringstream ss; \ - ss << "caught boost exception: " << e.what(); \ - msgasserted( 13294 , ss.str() ); \ + ss << "caught boost exception: " << e.what() << ' ' << __FILE__ << ' ' << __LINE__; \ + msgasserted( 13294 , ss.str() ); \ + } catch ( ... ) { \ + massert( 10437 , "unknown boost failed" , false ); \ + } + +#define MONGO_BOOST_CHECK_EXCEPTION_WITH_MSG( expression, msg ) \ + try { \ + expression; \ + } catch ( const std::exception &e ) { \ + stringstream ss; \ + ss << msg << " caught boost exception: " << e.what(); \ + msgasserted( 14043 , ss.str() ); \ } catch ( ... ) { \ - massert( 10437 , "unknown boost failed" , false ); \ + msgasserted( 14044 , string("unknown boost failed ") + msg ); \ } #define DESTRUCTOR_GUARD MONGO_DESTRUCTOR_GUARD @@ -210,3 +254,5 @@ namespace mongo { } catch ( ... ) { \ problem() << "caught unknown exception in destructor (" << __FUNCTION__ << ")" << endl; \ } + +#undef MONGO_NORETURN diff --git a/util/background.cpp b/util/background.cpp index 746d14c..215b271 100644 --- a/util/background.cpp +++ b/util/background.cpp @@ -18,8 +18,11 @@ #include "pch.h" #include "concurrency/mutex.h" +#include "concurrency/spin_lock.h" #include "background.h" +#include "time_support.h" +#include "timer.h" #include "mongoutils/str.h" @@ -80,6 +83,7 @@ namespace mongo { } bool BackgroundJob::wait( unsigned msTimeOut ) { + assert( !_status->deleteSelf ); // you cannot call wait on a self-deleting job scoped_lock l( _status->m ); while ( _status->state != Done ) { if ( msTimeOut ) { @@ -117,4 +121,70 @@ namespace mongo { return _status->state == Running; } + // ------------------------- + + PeriodicTask::PeriodicTask() { + if ( ! theRunner ) + theRunner = new Runner(); + theRunner->add( this ); + } + + PeriodicTask::~PeriodicTask() { + theRunner->remove( this ); + } + + void PeriodicTask::Runner::add( PeriodicTask* task ) { + scoped_spinlock lk( _lock ); + _tasks.push_back( task ); + } + + void PeriodicTask::Runner::remove( PeriodicTask* task ) { + scoped_spinlock lk( _lock ); + for ( size_t i=0; i<_tasks.size(); i++ ) { + if ( _tasks[i] == task ) { + _tasks[i] = 0; + break; + } + } + } + + void PeriodicTask::Runner::run() { + int sleeptime = 60; + DEV sleeptime = 5; // to catch race conditions + + while ( ! inShutdown() ) { + + sleepsecs( sleeptime ); + + scoped_spinlock lk( _lock ); + + size_t size = _tasks.size(); + + for ( size_t i=0; i<size; i++ ) { + PeriodicTask * t = _tasks[i]; + if ( ! t ) + continue; + + if ( inShutdown() ) + break; + + Timer timer; + try { + t->taskDoWork(); + } + catch ( std::exception& e ) { + error() << "task: " << t->taskName() << " failed: " << e.what() << endl; + } + catch ( ... ) { + error() << "task: " << t->taskName() << " failed with unknown error" << endl; + } + + int ms = timer.millis(); + LOG( ms <= 3 ? 1 : 0 ) << "task: " << t->taskName() << " took: " << ms << "ms" << endl; + } + } + } + + PeriodicTask::Runner* PeriodicTask::theRunner = 0; + } // namespace mongo diff --git a/util/background.h b/util/background.h index 861df9b..496a1f4 100644 --- a/util/background.h +++ b/util/background.h @@ -17,6 +17,8 @@ #pragma once +#include "concurrency/spin_lock.h" + namespace mongo { /** @@ -102,5 +104,52 @@ namespace mongo { void jobBody( boost::shared_ptr<JobStatus> status ); }; + + /** + * these run "roughly" every minute + * instantiate statically + * class MyTask : public PeriodicTask { + * public: + * virtual string name() const { return "MyTask; " } + * virtual void doWork() { log() << "hi" << endl; } + * } myTask; + */ + class PeriodicTask { + public: + PeriodicTask(); + virtual ~PeriodicTask(); + + virtual void taskDoWork() = 0; + virtual string taskName() const = 0; + + class Runner : public BackgroundJob { + public: + virtual ~Runner(){} + + virtual string name() const { return "PeriodicTask::Runner"; } + + virtual void run(); + + void add( PeriodicTask* task ); + void remove( PeriodicTask* task ); + + private: + + SpinLock _lock; + + // these are NOT owned by Runner + // Runner will not delete these + // this never gets smaller + // only fields replaced with nulls + vector<PeriodicTask*> _tasks; + + }; + + static Runner* theRunner; + + }; + + + } // namespace mongo diff --git a/util/bson_util.h b/util/bson_util.h new file mode 100644 index 0000000..973e31f --- /dev/null +++ b/util/bson_util.h @@ -0,0 +1,42 @@ +// bson_util.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../pch.h" + +namespace mongo { + +template <typename T> +void bsonArrToNumVector(BSONElement el, vector<T>& results){ + + if(el.type() == Array){ + + vector<BSONElement> elements = el.Array(); + + for(vector<BSONElement>::iterator i = elements.begin(); i != elements.end(); ++i){ + results.push_back( (T) (*i).Number() ); + } + } + else if(el.isNumber()){ + results.push_back( (T) el.Number() ); + } + +} + + +} diff --git a/util/bufreader.h b/util/bufreader.h index a0dcefa..53f0ba7 100644 --- a/util/bufreader.h +++ b/util/bufreader.h @@ -28,6 +28,7 @@ namespace mongo { public: class eof : public std::exception { public: + eof() { } virtual const char * what() { return "BufReader eof"; } }; @@ -88,6 +89,7 @@ namespace mongo { } const void* pos() { return _pos; } + const void* start() { return _start; } private: const void *_start; diff --git a/util/checksum.h b/util/checksum.h new file mode 100644 index 0000000..009ab56 --- /dev/null +++ b/util/checksum.h @@ -0,0 +1,37 @@ +#pragma once
+#include "../pch.h"
+namespace mongo {
+ /** a simple, rather dumb, but very fast checksum. see perftests.cpp for unit tests. */
+ struct Checksum {
+ union {
+ unsigned char bytes[16];
+ unsigned long long words[2];
+ };
+
+ // if you change this you must bump dur::CurrentVersion
+ void gen(const void *buf, unsigned len) {
+ wassert( ((size_t)buf) % 8 == 0 ); // performance warning
+ unsigned n = len / 8 / 2;
+ const unsigned long long *p = (const unsigned long long *) buf;
+ unsigned long long a = 0;
+ for( unsigned i = 0; i < n; i++ ) {
+ a += (*p ^ i);
+ p++;
+ }
+ unsigned long long b = 0;
+ for( unsigned i = 0; i < n; i++ ) {
+ b += (*p ^ i);
+ p++;
+ }
+ unsigned long long c = 0;
+ for( unsigned i = n * 2 * 8; i < len; i++ ) { // 0-7 bytes left
+ c = (c << 8) | ((const char *)buf)[i];
+ }
+ words[0] = a ^ len;
+ words[1] = b ^ c;
+ }
+
+ bool operator==(const Checksum& rhs) const { return words[0]==rhs.words[0] && words[1]==rhs.words[1]; }
+ bool operator!=(const Checksum& rhs) const { return words[0]!=rhs.words[0] || words[1]!=rhs.words[1]; }
+ };
+}
diff --git a/util/compress.cpp b/util/compress.cpp new file mode 100644 index 0000000..bcde488 --- /dev/null +++ b/util/compress.cpp @@ -0,0 +1,31 @@ +// @file compress.cpp + +#include "../third_party/snappy/snappy.h" +#include "compress.h" +#include <string> +#include <string.h> +#include <assert.h> + +namespace mongo { + + void rawCompress(const char* input, + size_t input_length, + char* compressed, + size_t* compressed_length) + { + snappy::RawCompress(input, input_length, compressed, compressed_length); + } + + size_t maxCompressedLength(size_t source_len) { + return snappy::MaxCompressedLength(source_len); + } + + size_t compress(const char* input, size_t input_length, std::string* output) { + return snappy::Compress(input, input_length, output); + } + + bool uncompress(const char* compressed, size_t compressed_length, std::string* uncompressed) { + return snappy::Uncompress(compressed, compressed_length, uncompressed); + } + +} diff --git a/util/compress.h b/util/compress.h new file mode 100644 index 0000000..5bc5a33 --- /dev/null +++ b/util/compress.h @@ -0,0 +1,21 @@ +// @file compress.h + +#pragma once + +#include <string> + +namespace mongo { + + size_t compress(const char* input, size_t input_length, std::string* output); + + bool uncompress(const char* compressed, size_t compressed_length, std::string* uncompressed); + + size_t maxCompressedLength(size_t source_len); + void rawCompress(const char* input, + size_t input_length, + char* compressed, + size_t* compressed_length); + +} + + diff --git a/util/concurrency/list.h b/util/concurrency/list.h index e5eaec6..01bae6f 100644 --- a/util/concurrency/list.h +++ b/util/concurrency/list.h @@ -42,38 +42,54 @@ namespace mongo { friend class List1; T *_next; public: + Base() : _next(0){} + ~Base() { wassert(false); } // we never want this to happen T* next() const { return _next; } }; - T* head() const { return _head; } + /** note this is safe: + + T* p = mylist.head(); + if( p ) + use(p); + + and this is not: + + if( mylist.head() ) + use( mylist.head() ); // could become 0 + */ + T* head() const { return (T*) _head; } void push(T* t) { + assert( t->_next == 0 ); scoped_lock lk(_m); - t->_next = _head; + t->_next = (T*) _head; _head = t; } - // intentionally leak. + // intentionally leaks. void orphanAll() { + scoped_lock lk(_m); _head = 0; } /* t is not deleted, but is removed from the list. (orphaned) */ void orphan(T* t) { scoped_lock lk(_m); - T *&prev = _head; + T *&prev = (T*&) _head; T *n = prev; while( n != t ) { + uassert( 14050 , "List1: item to orphan not in list", n ); prev = n->_next; n = prev; } prev = t->_next; if( ++_orphans > 500 ) - log() << "warning orphans=" << _orphans << '\n'; + log() << "warning List1 orphans=" << _orphans << '\n'; } private: - T *_head; + volatile T *_head; mongo::mutex _m; int _orphans; }; diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h index c463498..f17c3f0 100644 --- a/util/concurrency/mutex.h +++ b/util/concurrency/mutex.h @@ -19,11 +19,12 @@ #include <map> #include <set> - #include "../heapcheck.h" namespace mongo { + void printStackTrace( ostream &o ); + class mutex; inline boost::xtime incxtimemillis( long long s ) { @@ -50,7 +51,6 @@ namespace mongo { map< mid, set<mid> > followers; boost::mutex &x; unsigned magic; - void aBreakPoint() { } // for debugging public: // set these to create an assert that @@ -147,20 +147,16 @@ namespace mongo { ~StaticObserver() { _destroyingStatics = true; } }; - /** On pthread systems, it is an error to destroy a mutex while held. Static global - * mutexes may be held upon shutdown in our implementation, and this way we avoid - * destroying them. - * NOT recursive. + /** On pthread systems, it is an error to destroy a mutex while held (boost mutex + * may use pthread). Static global mutexes may be held upon shutdown in our + * implementation, and this way we avoid destroying them. + * NOT recursive. */ class mutex : boost::noncopyable { public: #if defined(_DEBUG) const char * const _name; -#endif - -#if defined(_DEBUG) - mutex(const char *name) - : _name(name) + mutex(const char *name) : _name(name) #else mutex(const char *) #endif @@ -184,44 +180,47 @@ namespace mongo { #else ok( _l.locked() ) #endif - { - } - - ~try_lock() { - } - + { } private: boost::timed_mutex::scoped_timed_lock _l; - public: const bool ok; }; - class scoped_lock : boost::noncopyable { + public: #if defined(_DEBUG) - mongo::mutex *mut; + struct PostStaticCheck { + PostStaticCheck() { + if ( StaticObserver::_destroyingStatics ) { + cout << "trying to lock a mongo::mutex during static shutdown" << endl; + printStackTrace( cout ); + } + } + }; + + PostStaticCheck _check; + mongo::mutex * const _mut; #endif - public: - scoped_lock( mongo::mutex &m ) : _l( m.boost() ) { + scoped_lock( mongo::mutex &m ) : +#if defined(_DEBUG) + _mut(&m), +#endif + _l( m.boost() ) { #if defined(_DEBUG) - mut = &m; - mutexDebugger.entering(mut->_name); + mutexDebugger.entering(_mut->_name); #endif } ~scoped_lock() { #if defined(_DEBUG) - mutexDebugger.leaving(mut->_name); + mutexDebugger.leaving(_mut->_name); #endif } boost::timed_mutex::scoped_lock &boost() { return _l; } private: boost::timed_mutex::scoped_lock _l; }; - - private: - boost::timed_mutex &boost() { return *_m; } boost::timed_mutex *_m; }; @@ -229,4 +228,52 @@ namespace mongo { typedef mutex::scoped_lock scoped_lock; typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; + /** The concept with SimpleMutex is that it is a basic lock/unlock with no + special functionality (such as try and try timeout). Thus it can be + implemented using OS-specific facilities in all environments (if desired). + On Windows, the implementation below is faster than boost mutex. + */ +#if defined(_WIN32) + class SimpleMutex : boost::noncopyable { + CRITICAL_SECTION _cs; + public: + SimpleMutex(const char *name) { InitializeCriticalSection(&_cs); } + ~SimpleMutex() { DeleteCriticalSection(&_cs); } + + void lock() { EnterCriticalSection(&_cs); } + void unlock() { LeaveCriticalSection(&_cs); } + + class scoped_lock : boost::noncopyable { + SimpleMutex& _m; + public: + scoped_lock( SimpleMutex &m ) : _m(m) { _m.lock(); } + ~scoped_lock() { _m.unlock(); } + }; + }; +#else + class SimpleMutex : boost::noncopyable { + public: + SimpleMutex(const char* name) { assert( pthread_mutex_init(&_lock,0) == 0 ); } + ~SimpleMutex(){ + if ( ! StaticObserver::_destroyingStatics ) { + assert( pthread_mutex_destroy(&_lock) == 0 ); + } + } + + void lock() { assert( pthread_mutex_lock(&_lock) == 0 ); } + void unlock() { assert( pthread_mutex_unlock(&_lock) == 0 ); } + + class scoped_lock : boost::noncopyable { + SimpleMutex& _m; + public: + scoped_lock( SimpleMutex &m ) : _m(m) { _m.lock(); } + ~scoped_lock() { _m.unlock(); } + }; + + private: + pthread_mutex_t _lock; + }; + +#endif + } diff --git a/util/concurrency/race.h b/util/concurrency/race.h index 0b8338c..4644e37 100644 --- a/util/concurrency/race.h +++ b/util/concurrency/race.h @@ -19,15 +19,56 @@ namespace mongo { the same time. Also detects and disallows recursion. */ +#ifdef _WIN32 + typedef unsigned threadId_t; +#else + typedef pthread_t threadId_t; +#endif + + #if defined(_DEBUG) + namespace race { + + class CodePoint { + public: + string lastName; + threadId_t lastTid; + string file; + CodePoint(string f) : lastTid(0), file(f) { } + }; + class Check { + public: + Check(CodePoint& p) { + threadId_t t = GetCurrentThreadId(); + if( p.lastTid == 0 ) { + p.lastTid = t; + p.lastName = getThreadName(); + } + else if( t != p.lastTid ) { + log() << "\n\n\n\n\nRACE? error assert\n " << p.file << '\n' + << " " << p.lastName + << " " << getThreadName() << "\n\n" << endl; + mongoAbort("racecheck"); + } + }; + }; + + } + +#define RACECHECK + // dm TODO - the right code for this file is in a different branch at the moment (merge) + //#define RACECHECK + //static race::CodePoint __cp(__FILE__); + //race::Check __ck(__cp); + class CodeBlock { volatile int n; - unsigned tid; + threadId_t tid; void fail() { log() << "synchronization (race condition) failure" << endl; printStackTrace(); - abort(); + ::abort(); } void enter() { if( ++n != 1 ) fail(); @@ -58,6 +99,8 @@ namespace mongo { #else +#define RACECHECK + class CodeBlock{ public: class Within { @@ -69,4 +112,4 @@ namespace mongo { #endif -} +} // namespace diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h index ca81a9f..d8a11ea 100644 --- a/util/concurrency/rwlock.h +++ b/util/concurrency/rwlock.h @@ -21,9 +21,11 @@ #include "mutex.h" #include "../time_support.h" -// this requires Vista+ to work +// this requires newer windows versions // it works better than sharable_mutex under high contention +#if defined(_WIN64) //#define MONGO_USE_SRW_ON_WINDOWS 1 +#endif #if !defined(MONGO_USE_SRW_ON_WINDOWS) @@ -55,131 +57,153 @@ namespace mongo { #if defined(MONGO_USE_SRW_ON_WINDOWS) && defined(_WIN32) - class RWLock { + // Windows RWLock implementation (requires newer versions of windows thus the above macro) + class RWLock : boost::noncopyable { public: - RWLock(const char *) { InitializeSRWLock(&_lock); } + RWLock(const char *, int lowPriorityWaitMS=0 ) : _lowPriorityWaitMS(lowPriorityWaitMS) + { InitializeSRWLock(&_lock); } ~RWLock() { } + const char * implType() const { return "WINSRW"; } + int lowPriorityWaitMS() const { return _lowPriorityWaitMS; } void lock() { AcquireSRWLockExclusive(&_lock); } void unlock() { ReleaseSRWLockExclusive(&_lock); } void lock_shared() { AcquireSRWLockShared(&_lock); } void unlock_shared() { ReleaseSRWLockShared(&_lock); } bool lock_shared_try( int millis ) { + if( TryAcquireSRWLockShared(&_lock) ) + return true; + if( millis == 0 ) + return false; unsigned long long end = curTimeMicros64() + millis*1000; while( 1 ) { + Sleep(1); if( TryAcquireSRWLockShared(&_lock) ) return true; if( curTimeMicros64() >= end ) break; - Sleep(1); } return false; } bool lock_try( int millis = 0 ) { + if( TryAcquireSRWLockExclusive(&_lock) ) // quick check to optimistically avoid calling curTimeMicros64 + return true; + if( millis == 0 ) + return false; unsigned long long end = curTimeMicros64() + millis*1000; - while( 1 ) { + do { + Sleep(1); if( TryAcquireSRWLockExclusive(&_lock) ) return true; - if( curTimeMicros64() >= end ) - break; - Sleep(1); - } + } while( curTimeMicros64() < end ); return false; } private: SRWLOCK _lock; + const int _lowPriorityWaitMS; }; #elif defined(BOOST_RWLOCK) - class RWLock { + + // Boost based RWLock implementation + class RWLock : boost::noncopyable { shared_mutex _m; + const int _lowPriorityWaitMS; public: -#if defined(_DEBUG) - const char *_name; - RWLock(const char *name) : _name(name) { } -#else - RWLock(const char *) { } -#endif + const char * const _name; + + RWLock(const char *name, int lowPriorityWait=0) : _lowPriorityWaitMS(lowPriorityWait) , _name(name) { } + + const char * implType() const { return "boost"; } + + int lowPriorityWaitMS() const { return _lowPriorityWaitMS; } + void lock() { - _m.lock(); -#if defined(_DEBUG) - mutexDebugger.entering(_name); -#endif + _m.lock(); + DEV mutexDebugger.entering(_name); } + + /*void lock() { + // This sequence gives us the lock semantics we want: specifically that write lock acquisition is + // greedy EXCEPT when someone already is in upgradable state. + lockAsUpgradable(); + upgrade(); + DEV mutexDebugger.entering(_name); + }*/ + void unlock() { -#if defined(_DEBUG) - mutexDebugger.leaving(_name); -#endif + DEV mutexDebugger.leaving(_name); _m.unlock(); } + void lockAsUpgradable() { + _m.lock_upgrade(); + } + void unlockFromUpgradable() { // upgradable -> unlocked + _m.unlock_upgrade(); + } + void upgrade() { // upgradable -> exclusive lock + _m.unlock_upgrade_and_lock(); + } + void lock_shared() { _m.lock_shared(); } - void unlock_shared() { _m.unlock_shared(); } bool lock_shared_try( int millis ) { - boost::system_time until = get_system_time(); - until += boost::posix_time::milliseconds(millis); - if( _m.timed_lock_shared( until ) ) { + if( _m.timed_lock_shared( boost::posix_time::milliseconds(millis) ) ) { return true; } return false; } bool lock_try( int millis = 0 ) { - boost::system_time until = get_system_time(); - until += boost::posix_time::milliseconds(millis); - if( _m.timed_lock( until ) ) { -#if defined(_DEBUG) - mutexDebugger.entering(_name); -#endif + if( _m.timed_lock( boost::posix_time::milliseconds(millis) ) ) { + DEV mutexDebugger.entering(_name); return true; } return false; } - - }; + #else - class RWLock { - pthread_rwlock_t _lock; - inline void check( int x ) { - if( x == 0 ) + // Posix RWLock implementation + class RWLock : boost::noncopyable { + pthread_rwlock_t _lock; + const int _lowPriorityWaitMS; + static void check( int x ) { + if( MONGO_likely(x == 0) ) return; log() << "pthread rwlock failed: " << x << endl; assert( x == 0 ); } - + public: -#if defined(_DEBUG) const char *_name; - RWLock(const char *name) : _name(name) { -#else - RWLock(const char *) { -#endif + RWLock(const char *name, int lowPriorityWaitMS=0) : _lowPriorityWaitMS(lowPriorityWaitMS), _name(name) + { check( pthread_rwlock_init( &_lock , 0 ) ); } - + ~RWLock() { if ( ! StaticObserver::_destroyingStatics ) { - check( pthread_rwlock_destroy( &_lock ) ); + wassert( pthread_rwlock_destroy( &_lock ) == 0 ); // wassert as don't want to throw from a destructor } } + const char * implType() const { return "posix"; } + + int lowPriorityWaitMS() const { return _lowPriorityWaitMS; } + void lock() { check( pthread_rwlock_wrlock( &_lock ) ); -#if defined(_DEBUG) - mutexDebugger.entering(_name); -#endif + DEV mutexDebugger.entering(_name); } void unlock() { -#if defined(_DEBUG) mutexDebugger.leaving(_name); -#endif check( pthread_rwlock_unlock( &_lock ) ); } @@ -197,9 +221,7 @@ namespace mongo { bool lock_try( int millis = 0 ) { if( _try( millis , true ) ) { -#if defined(_DEBUG) - mutexDebugger.entering(_name); -#endif + DEV mutexDebugger.entering(_name); return true; } return false; @@ -233,7 +255,7 @@ namespace mongo { #endif /** throws on failure to acquire in the specified time period. */ - class rwlock_try_write { + class rwlock_try_write : boost::noncopyable { public: struct exception { }; rwlock_try_write(RWLock& l, int millis = 0) : _l(l) { @@ -245,16 +267,57 @@ namespace mongo { RWLock& _l; }; + class rwlock_shared : boost::noncopyable { + public: + rwlock_shared(RWLock& rwlock) : _r(rwlock) {_r.lock_shared(); } + ~rwlock_shared() { _r.unlock_shared(); } + private: + RWLock& _r; + }; + /* scoped lock for RWLock */ - class rwlock { + class rwlock : boost::noncopyable { public: - rwlock( const RWLock& lock , bool write , bool alreadyHaveLock = false ) + /** + * @param write acquire write lock if true sharable if false + * @param lowPriority if > 0, will try to get the lock non-greedily for that many ms + */ + rwlock( const RWLock& lock , bool write, /* bool alreadyHaveLock = false , */int lowPriorityWaitMS = 0 ) : _lock( (RWLock&)lock ) , _write( write ) { - if ( ! alreadyHaveLock ) { - if ( _write ) - _lock.lock(); - else + + { + if ( _write ) { + + if ( ! lowPriorityWaitMS && lock.lowPriorityWaitMS() ) + lowPriorityWaitMS = lock.lowPriorityWaitMS(); + + if ( lowPriorityWaitMS ) { + bool got = false; + for ( int i=0; i<lowPriorityWaitMS; i++ ) { + if ( _lock.lock_try(0) ) { + got = true; + break; + } + + int sleep = 1; + if ( i > ( lowPriorityWaitMS / 20 ) ) + sleep = 10; + sleepmillis(sleep); + i += ( sleep - 1 ); + } + if ( ! got ) { + log() << "couldn't get lazy rwlock" << endl; + _lock.lock(); + } + } + else { + _lock.lock(); + } + + } + else { _lock.lock_shared(); + } } } ~rwlock() { @@ -267,4 +330,67 @@ namespace mongo { RWLock& _lock; const bool _write; }; + + /** recursive on shared locks is ok for this implementation */ + class RWLockRecursive : boost::noncopyable { + ThreadLocalValue<int> _state; + RWLock _lk; + friend class Exclusive; + public: + /** @param lpwaitms lazy wait */ + RWLockRecursive(const char *name, int lpwaitms) : _lk(name, lpwaitms) { } + + void assertExclusivelyLocked() { + dassert( _state.get() < 0 ); + } + + // RWLockRecursive::Exclusive scoped lock + class Exclusive : boost::noncopyable { + RWLockRecursive& _r; + rwlock *_scopedLock; + public: + Exclusive(RWLockRecursive& r) : _r(r), _scopedLock(0) { + int s = _r._state.get(); + dassert( s <= 0 ); + if( s == 0 ) + _scopedLock = new rwlock(_r._lk, true); + _r._state.set(s-1); + } + ~Exclusive() { + int s = _r._state.get(); + DEV wassert( s < 0 ); // wassert: don't throw from destructors + _r._state.set(s+1); + delete _scopedLock; + } + }; + + // RWLockRecursive::Shared scoped lock + class Shared : boost::noncopyable { + RWLockRecursive& _r; + bool _alreadyExclusive; + public: + Shared(RWLockRecursive& r) : _r(r) { + int s = _r._state.get(); + _alreadyExclusive = s < 0; + if( !_alreadyExclusive ) { + dassert( s >= 0 ); // -1 would mean exclusive + if( s == 0 ) + _r._lk.lock_shared(); + _r._state.set(s+1); + } + } + ~Shared() { + if( _alreadyExclusive ) { + DEV wassert( _r._state.get() < 0 ); + } + else { + int s = _r._state.get() - 1; + if( s == 0 ) + _r._lk.unlock_shared(); + _r._state.set(s); + DEV wassert( s >= 0 ); + } + } + }; + }; } diff --git a/util/concurrency/shared_mutex_win.hpp b/util/concurrency/shared_mutex_win.hpp index 5356cf2..e850fc6 100755..100644 --- a/util/concurrency/shared_mutex_win.hpp +++ b/util/concurrency/shared_mutex_win.hpp @@ -7,10 +7,31 @@ // accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
-// MongoDB :
-//
-// Slightly modified boost file to not die above 127 pending writes
-//
+/* MongoDB :
+ Slightly modified boost file to not die above 127 pending writes
+ Here is what changed (from boost 1.42.0 shared_mutex.hpp):
+ 1,2c1,2
+ < #ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP
+ < #define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP
+ ---
+ > #ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+ > #define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+ 22c27
+ < class shared_mutex:
+ ---
+ > class modified_shared_mutex:
+ 73c78
+ < shared_mutex():
+ ---
+ > modified_shared_mutex():
+ 84c89
+ < ~shared_mutex()
+ ---
+ > ~modified_shared_mutex()
+ 283a289,290
+ > if( new_state.exclusive_waiting == 127 ) // the maximum already!
+ > break;
+*/
#include <boost/assert.hpp>
#include <boost/detail/interlocked.hpp>
diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp index 0f33609..1811f15 100644 --- a/util/concurrency/spin_lock.cpp +++ b/util/concurrency/spin_lock.cpp @@ -25,20 +25,28 @@ namespace mongo { SpinLock::~SpinLock() { #if defined(_WIN32) DeleteCriticalSection(&_cs); +#elif defined(__USE_XOPEN2K) + pthread_spin_destroy(&_lock); #endif } SpinLock::SpinLock() -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - : _locked( false ) { } -#elif defined(_WIN32) +#if defined(_WIN32) { InitializeCriticalSectionAndSpinCount(&_cs, 4000); } +#elif defined(__USE_XOPEN2K) + { pthread_spin_init( &_lock , 0 ); } +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + : _locked( false ) { } #else - : _mutex( "SpinLock" ) { } + : _mutex( "SpinLock" ) { } #endif void SpinLock::lock() { -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) +#if defined(_WIN32) + EnterCriticalSection(&_cs); +#elif defined(__USE_XOPEN2K) + pthread_spin_lock( &_lock ); +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) // fast path if (!_locked && !__sync_lock_test_and_set(&_locked, true)) { return; @@ -55,8 +63,6 @@ namespace mongo { while (__sync_lock_test_and_set(&_locked, true)) { nanosleep(&t, NULL); } -#elif defined(_WIN32) - EnterCriticalSection(&_cs); #else // WARNING Missing spin lock in this platform. This can potentially // be slow. @@ -66,19 +72,28 @@ namespace mongo { } void SpinLock::unlock() { -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - - __sync_lock_release(&_locked); - -#elif defined(WIN32) - +#if defined(_WIN32) LeaveCriticalSection(&_cs); - +#elif defined(__USE_XOPEN2K) + pthread_spin_unlock(&_lock); +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + __sync_lock_release(&_locked); #else - _mutex.unlock(); +#endif + } + bool SpinLock::isfast() { +#if defined(_WIN32) + return true; +#elif defined(__USE_XOPEN2K) + return true; +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + return true; +#else + return false; #endif } + } // namespace mongo diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h index 02a8797..65ecb15 100644 --- a/util/concurrency/spin_lock.h +++ b/util/concurrency/spin_lock.h @@ -18,8 +18,7 @@ #pragma once -#include "pch.h" -#include "rwlock.h" +#include "mutex.h" namespace mongo { @@ -27,7 +26,7 @@ namespace mongo { * The spinlock currently requires late GCC support routines to be efficient. * Other platforms default to a mutex implemenation. */ - class SpinLock { + class SpinLock : boost::noncopyable { public: SpinLock(); ~SpinLock(); @@ -35,30 +34,30 @@ namespace mongo { void lock(); void unlock(); + static bool isfast(); // true if a real spinlock on this platform + private: -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - volatile bool _locked; -#elif defined(_WIN32) +#if defined(_WIN32) CRITICAL_SECTION _cs; +#elif defined(__USE_XOPEN2K) + pthread_spinlock_t _lock; +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + volatile bool _locked; #else - // default to a scoped mutex if not implemented - RWLock _mutex; + // default to a mutex if not implemented + SimpleMutex _mutex; #endif - - // Non-copyable, non-assignable - SpinLock(SpinLock&); - SpinLock& operator=(SpinLock&); }; - struct scoped_spinlock { - scoped_spinlock( SpinLock& l ) : _l(l){ + class scoped_spinlock : boost::noncopyable { + public: + scoped_spinlock( SpinLock& l ) : _l(l) { _l.lock(); } ~scoped_spinlock() { - _l.unlock(); - } + _l.unlock();} + private: SpinLock& _l; }; } // namespace mongo - diff --git a/util/concurrency/synchronization.cpp b/util/concurrency/synchronization.cpp index 12e2894..ce2547c 100644 --- a/util/concurrency/synchronization.cpp +++ b/util/concurrency/synchronization.cpp @@ -20,7 +20,8 @@ namespace mongo { - Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { } + Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { + } Notification::~Notification() { } @@ -37,19 +38,40 @@ namespace mongo { _condition.notify_one(); } - NotifyAll::NotifyAll() : _mutex("NotifyAll"), _counter(0) { } + /* --- NotifyAll --- */ + + NotifyAll::NotifyAll() : _mutex("NotifyAll") { + _lastDone = 0; + _lastReturned = 0; + _nWaiting = 0; + } + + NotifyAll::When NotifyAll::now() { + scoped_lock lock( _mutex ); + return ++_lastReturned; + } + + void NotifyAll::waitFor(When e) { + scoped_lock lock( _mutex ); + ++_nWaiting; + while( _lastDone < e ) { + _condition.wait( lock.boost() ); + } + } - void NotifyAll::wait() { + void NotifyAll::awaitBeyondNow() { scoped_lock lock( _mutex ); - unsigned long long old = _counter; - while( old == _counter ) { + ++_nWaiting; + When e = ++_lastReturned; + while( _lastDone <= e ) { _condition.wait( lock.boost() ); } } - void NotifyAll::notifyAll() { + void NotifyAll::notifyAll(When e) { scoped_lock lock( _mutex ); - ++_counter; + _lastDone = e; + _nWaiting = 0; _condition.notify_all(); } diff --git a/util/concurrency/synchronization.h b/util/concurrency/synchronization.h index ac2fcab..a0e89f7 100644 --- a/util/concurrency/synchronization.h +++ b/util/concurrency/synchronization.h @@ -56,18 +56,30 @@ namespace mongo { public: NotifyAll(); + typedef unsigned long long When; + + When now(); + /** awaits the next notifyAll() call by another thread. notifications that precede this call are ignored -- we are looking for a fresh event. */ - void wait(); + void waitFor(When); + + /** a bit faster than waitFor( now() ) */ + void awaitBeyondNow(); /** may be called multiple times. notifies all waiters */ - void notifyAll(); + void notifyAll(When); + + /** indicates how many threads are waiting for a notify. */ + unsigned nWaiting() const { return _nWaiting; } private: mongo::mutex _mutex; - unsigned long long _counter; boost::condition _condition; + When _lastDone; + When _lastReturned; + unsigned _nWaiting; }; } // namespace mongo diff --git a/util/concurrency/value.h b/util/concurrency/value.h index 0a0ef85..c66977b 100644 --- a/util/concurrency/value.h +++ b/util/concurrency/value.h @@ -1,5 +1,5 @@ /* @file value.h - concurrency helpers Atomic<T> and DiagStr + concurrency helpers DiagStr, Guarded */ /** @@ -20,44 +20,29 @@ #pragma once +#include "mutex.h" + namespace mongo { - extern mutex _atomicMutex; + /** declare that a variable that is "guarded" by a mutex. - /** atomic wrapper for a value. enters a mutex on each access. must - be copyable. - */ - template<typename T> - class Atomic : boost::noncopyable { - T val; - public: - Atomic<T>() { } + The decl documents the rule. For example "counta and countb are guarded by xyzMutex": - void operator=(const T& a) { - scoped_lock lk(_atomicMutex); - val = a; - } + Guarded<int, xyzMutex> counta; + Guarded<int, xyzMutex> countb; - operator T() const { - scoped_lock lk(_atomicMutex); - return val; + Upon use, specify the scoped_lock object. This makes it hard for someone + later to forget to be in the lock. Check is made that it is the right lock in _DEBUG + builds at runtime. + */ + template <typename T, mutex& BY> + class Guarded { + T _val; + public: + T& ref(const scoped_lock& lk) { + dassert( lk._mut == &BY ); + return _val; } - - /** example: - Atomic<int> q; - ... - { - Atomic<int>::tran t(q); - if( q.ref() > 0 ) - q.ref()--; - } - */ - class tran : private scoped_lock { - Atomic<T>& _a; - public: - tran(Atomic<T>& a) : scoped_lock(_atomicMutex), _a(a) { } - T& ref() { return _a.val; } - }; }; class DiagStr { diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp index 19b58eb..213e576 100644 --- a/util/concurrency/vars.cpp +++ b/util/concurrency/vars.cpp @@ -17,15 +17,13 @@ */ #include "pch.h" -#include "value.h" #include "mutex.h" +#include "value.h" namespace mongo { mutex DiagStr::m("diags"); - mongo::mutex _atomicMutex("_atomicMutex"); - // intentional leak. otherwise destructor orders can be problematic at termination. MutexDebugger &mutexDebugger = *(new MutexDebugger()); diff --git a/util/file.h b/util/file.h index 0a973e3..368e692 100644 --- a/util/file.h +++ b/util/file.h @@ -23,10 +23,8 @@ #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> -#else -#include <windows.h> +#include <sys/statvfs.h> #endif - #include "text.h" namespace mongo { @@ -37,6 +35,8 @@ namespace mongo { typedef boost::uint64_t fileofs; #endif + /* NOTE: not thread-safe. (at least the windows implementation isn't. */ + class FileInterface { public: void open(const char *fn) {} @@ -46,6 +46,12 @@ namespace mongo { bool is_open() {return false;} fileofs len() { return 0; } void fsync() { assert(false); } + + // shrink file to size bytes. No-op if file already smaller. + void truncate(fileofs size); + + /** @return -1 if error or unavailable */ + static boost::intmax_t freeSpace(const string &path) { assert(false); return -1; } }; #if defined(_WIN32) @@ -54,10 +60,11 @@ namespace mongo { class File : public FileInterface { HANDLE fd; bool _bad; + string _name; void err(BOOL b=false) { /* false = error happened */ if( !b && !_bad ) { _bad = true; - log() << "File I/O error " << GetLastError() << '\n'; + log() << "File " << _name << "I/O error " << GetLastError() << '\n'; } } public: @@ -69,7 +76,8 @@ namespace mongo { if( is_open() ) CloseHandle(fd); fd = INVALID_HANDLE_VALUE; } - void open(const char *filename, bool readOnly=false ) { + void open(const char *filename, bool readOnly=false , bool direct=false) { + _name = filename; fd = CreateFile( toNativeString(filename).c_str(), ( readOnly ? 0 : GENERIC_WRITE ) | GENERIC_READ, FILE_SHARE_WRITE|FILE_SHARE_READ, @@ -81,6 +89,15 @@ namespace mongo { else _bad = false; } + static boost::intmax_t freeSpace(const string &path) { + ULARGE_INTEGER avail; + if( GetDiskFreeSpaceEx(toNativeString(path.c_str()).c_str(), &avail, NULL, NULL) ) { + return avail.QuadPart; + } + DWORD e = GetLastError(); + log() << "GetDiskFreeSpaceEx fails errno: " << e << endl; + return -1; + } void write(fileofs o, const char *data, unsigned len) { LARGE_INTEGER li; li.QuadPart = o; @@ -111,6 +128,20 @@ namespace mongo { return li.QuadPart; } void fsync() { FlushFileBuffers(fd); } + + void truncate(fileofs size) { + if (len() <= size) + return; + + LARGE_INTEGER li; + li.QuadPart = size; + if (SetFilePointerEx(fd, li, NULL, FILE_BEGIN) == 0){ + err(false); + return; //couldn't seek + } + + err(SetEndOfFile(fd)); + } }; #else @@ -140,9 +171,13 @@ namespace mongo { #define O_NOATIME 0 #endif - void open(const char *filename, bool readOnly=false ) { + void open(const char *filename, bool readOnly=false , bool direct=false) { fd = ::open(filename, - O_CREAT | ( readOnly ? 0 : ( O_RDWR | O_NOATIME ) ) , + O_CREAT | ( readOnly ? 0 : ( O_RDWR | O_NOATIME ) ) +#if defined(O_DIRECT) + | ( direct ? O_DIRECT : 0 ) +#endif + , S_IRUSR | S_IWUSR); if ( fd <= 0 ) { out() << "couldn't open " << filename << ' ' << errnoWithDescription() << endl; @@ -154,14 +189,37 @@ namespace mongo { err( ::pwrite(fd, data, len, o) == (int) len ); } void read(fileofs o, char *data, unsigned len) { - err( ::pread(fd, data, len, o) == (int) len ); + ssize_t s = ::pread(fd, data, len, o); + if( s == -1 ) { + err(false); + } + else if( s != (int) len ) { + _bad = true; + log() << "File error read:" << s << " bytes, wanted:" << len << " ofs:" << o << endl; + } } bool bad() { return _bad; } bool is_open() { return fd > 0; } fileofs len() { - return lseek(fd, 0, SEEK_END); + off_t o = lseek(fd, 0, SEEK_END); + if( o != (off_t) -1 ) + return o; + err(false); + return 0; } void fsync() { ::fsync(fd); } + static boost::intmax_t freeSpace ( const string &path ) { + struct statvfs info; + assert( !statvfs( path.c_str() , &info ) ); + return boost::intmax_t( info.f_bavail ) * info.f_frsize; + } + + void truncate(fileofs size) { + if (len() <= size) + return; + + err(ftruncate(fd, size) == 0); + } }; diff --git a/util/file_allocator.cpp b/util/file_allocator.cpp index 54590ed..b0572f9 100644 --- a/util/file_allocator.cpp +++ b/util/file_allocator.cpp @@ -32,19 +32,22 @@ using namespace mongoutils; #endif #include "file_allocator.h" +#include "paths.h" namespace mongo { - void ensureParentDirCreated(const boost::filesystem::path& p){ + boost::filesystem::path ensureParentDirCreated(const boost::filesystem::path& p){ const boost::filesystem::path parent = p.branch_path(); - + if (! boost::filesystem::exists(parent)){ ensureParentDirCreated(parent); log() << "creating directory " << parent.string() << endl; boost::filesystem::create_directory(parent); + flushMyDirectory(parent); // flushes grandparent to ensure parent exists after crash } - + assert(boost::filesystem::is_directory(parent)); + return parent; } #if defined(_WIN32) @@ -74,6 +77,10 @@ namespace mongo { // TODO : we should to avoid fragmentation } + bool FileAllocator::hasFailed() const { + return false; + } + #else FileAllocator::FileAllocator() @@ -174,6 +181,10 @@ namespace mongo { } } + bool FileAllocator::hasFailed() const { + return _failed; + } + void FileAllocator::checkFailure() { if (_failed) { // we want to log the problem (diskfull.js expects it) but we do not want to dump a stack tracke @@ -197,6 +208,19 @@ namespace mongo { return false; } + string makeTempFileName( path root ) { + while( 1 ) { + path p = root / "_tmp"; + stringstream ss; + ss << (unsigned) rand(); + p /= ss.str(); + string fn = p.string(); + if( !boost::filesystem::exists(p) ) + return fn; + } + return ""; + } + void FileAllocator::run( FileAllocator * fa ) { setThreadName( "FileAllocator" ); while( 1 ) { @@ -215,19 +239,25 @@ namespace mongo { name = fa->_pending.front(); size = fa->_pendingSize[ name ]; } + + string tmp; + long fd = 0; try { log() << "allocating new datafile " << name << ", filling with zeroes..." << endl; - ensureParentDirCreated(name); - long fd = open(name.c_str(), O_CREAT | O_RDWR | O_NOATIME, S_IRUSR | S_IWUSR); + + boost::filesystem::path parent = ensureParentDirCreated(name); + tmp = makeTempFileName( parent ); + ensureParentDirCreated(tmp); + + fd = open(tmp.c_str(), O_CREAT | O_RDWR | O_NOATIME, S_IRUSR | S_IWUSR); if ( fd <= 0 ) { - stringstream ss; - ss << "FileAllocator: couldn't open " << name << ' ' << errnoWithDescription(); - uassert( 10439 , ss.str(), fd <= 0 ); + log() << "FileAllocator: couldn't create " << name << " (" << tmp << ") " << errnoWithDescription() << endl; + uasserted(10439, ""); } #if defined(POSIX_FADV_DONTNEED) if( posix_fadvise(fd, 0, size, POSIX_FADV_DONTNEED) ) { - log() << "warning: posix_fadvise fails " << name << ' ' << errnoWithDescription() << endl; + log() << "warning: posix_fadvise fails " << name << " (" << tmp << ") " << errnoWithDescription() << endl; } #endif @@ -236,18 +266,32 @@ namespace mongo { /* make sure the file is the full desired length */ ensureLength( fd , size ); + close( fd ); + fd = 0; + + if( rename(tmp.c_str(), name.c_str()) ) { + log() << "error: couldn't rename " << tmp << " to " << name << ' ' << errnoWithDescription() << endl; + uasserted(13653, ""); + } + flushMyDirectory(name); + log() << "done allocating datafile " << name << ", " << "size: " << size/1024/1024 << "MB, " << " took " << ((double)t.millis())/1000.0 << " secs" << endl; - close( fd ); - + // no longer in a failed state. allow new writers. + fa->_failed = false; } catch ( ... ) { + if ( fd > 0 ) + close( fd ); log() << "error failed to allocate new file: " << name - << " size: " << size << ' ' << errnoWithDescription() << endl; + << " size: " << size << ' ' << errnoWithDescription() << warnings; + log() << " will try again in 10 seconds" << endl; // not going to warning logs try { + if ( tmp.size() ) + BOOST_CHECK_EXCEPTION( boost::filesystem::remove( tmp ) ); BOOST_CHECK_EXCEPTION( boost::filesystem::remove( name ) ); } catch ( ... ) { @@ -256,7 +300,10 @@ namespace mongo { fa->_failed = true; // not erasing from pending fa->_pendingUpdated.notify_all(); - return; // no more allocation + + + sleepsecs(10); + continue; } { diff --git a/util/file_allocator.h b/util/file_allocator.h index 6cc7b2d..7c3cacb 100644 --- a/util/file_allocator.h +++ b/util/file_allocator.h @@ -47,12 +47,14 @@ namespace mongo { void allocateAsap( const string &name, unsigned long long &size ); void waitUntilFinished() const; + + bool hasFailed() const; static void ensureLength(int fd , long size); /** @return the singletone */ static FileAllocator * get(); - + private: FileAllocator(); @@ -84,6 +86,6 @@ namespace mongo { }; /** like "mkdir -p" but on parent dir of p rather than p itself */ - void ensureParentDirCreated(const boost::filesystem::path& p); + boost::filesystem::path ensureParentDirCreated(const boost::filesystem::path& p); } // namespace mongo diff --git a/util/goodies.h b/util/goodies.h index 53a74c2..65bfbab 100644 --- a/util/goodies.h +++ b/util/goodies.h @@ -109,49 +109,12 @@ namespace mongo { // PRINTFL; prints file:line #define MONGO_PRINTFL cout << __FILE__ ":" << __LINE__ << endl #define PRINTFL MONGO_PRINTFL +#define MONGO_FLOG log() << __FILE__ ":" << __LINE__ << endl +#define FLOG MONGO_FLOG #undef assert #define assert MONGO_assert - struct WrappingInt { - WrappingInt() { - x = 0; - } - WrappingInt(unsigned z) : x(z) { } - unsigned x; - operator unsigned() const { - return x; - } - - - static int diff(unsigned a, unsigned b) { - return a-b; - } - bool operator<=(WrappingInt r) { - // platform dependent - int df = (r.x - x); - return df >= 0; - } - bool operator>(WrappingInt r) { - return !(r<=*this); - } - }; - - /* - - class DebugMutex : boost::noncopyable { - friend class lock; - mongo::mutex m; - int locked; - public: - DebugMutex() : locked(0); { } - bool isLocked() { return locked; } - }; - - */ - -//typedef scoped_lock lock; - inline bool startsWith(const char *str, const char *prefix) { size_t l = strlen(prefix); if ( strlen(str) < l ) return false; @@ -236,6 +199,7 @@ namespace mongo { _active = 0; } + // typically you do ProgressMeterHolder void reset( unsigned long long total , int secondsBetween = 3 , int checkInterval = 100 ) { _total = total; _secondsBetween = secondsBetween; @@ -257,6 +221,7 @@ namespace mongo { } /** + * @param n how far along we are relative to the total # we set in CurOp::setMessage * @return if row was printed */ bool hit( int n = 1 ) { @@ -282,13 +247,15 @@ namespace mongo { return true; } - unsigned long long done() { - return _done; + void setTotalWhileRunning( unsigned long long total ) { + _total = total; } - unsigned long long hits() { - return _hits; - } + unsigned long long done() const { return _done; } + + unsigned long long hits() const { return _hits; } + + unsigned long long total() const { return _total; } string toString() const { if ( ! _active ) @@ -314,6 +281,10 @@ namespace mongo { int _lastTime; }; + // e.g.: + // CurOp * op = cc().curop(); + // ProgressMeterHolder pm( op->setMessage( "index: (1/3) external sort" , d->stats.nrecords , 10 ) ); + // loop { pm.hit(); } class ProgressMeterHolder : boost::noncopyable { public: ProgressMeterHolder( ProgressMeter& pm ) @@ -417,7 +388,7 @@ namespace mongo { class ThreadSafeString { public: ThreadSafeString( size_t size=256 ) - : _size( 256 ) , _buf( new char[256] ) { + : _size( size ) , _buf( new char[size] ) { memset( _buf , 0 , _size ); } @@ -468,97 +439,6 @@ namespace mongo { ostream& operator<<( ostream &s, const ThreadSafeString &o ); - inline bool isNumber( char c ) { - return c >= '0' && c <= '9'; - } - - inline unsigned stringToNum(const char *str) { - unsigned x = 0; - const char *p = str; - while( 1 ) { - if( !isNumber(*p) ) { - if( *p == 0 && p != str ) - break; - throw 0; - } - x = x * 10 + *p++ - '0'; - } - return x; - } - - // for convenience, '{' is greater than anything and stops number parsing - inline int lexNumCmp( const char *s1, const char *s2 ) { - //cout << "START : " << s1 << "\t" << s2 << endl; - while( *s1 && *s2 ) { - - bool p1 = ( *s1 == (char)255 ); - bool p2 = ( *s2 == (char)255 ); - //cout << "\t\t " << p1 << "\t" << p2 << endl; - if ( p1 && !p2 ) - return 1; - if ( p2 && !p1 ) - return -1; - - bool n1 = isNumber( *s1 ); - bool n2 = isNumber( *s2 ); - - if ( n1 && n2 ) { - // get rid of leading 0s - while ( *s1 == '0' ) s1++; - while ( *s2 == '0' ) s2++; - - char * e1 = (char*)s1; - char * e2 = (char*)s2; - - // find length - // if end of string, will break immediately ('\0') - while ( isNumber (*e1) ) e1++; - while ( isNumber (*e2) ) e2++; - - int len1 = (int)(e1-s1); - int len2 = (int)(e2-s2); - - int result; - // if one is longer than the other, return - if ( len1 > len2 ) { - return 1; - } - else if ( len2 > len1 ) { - return -1; - } - // if the lengths are equal, just strcmp - else if ( (result = strncmp(s1, s2, len1)) != 0 ) { - return result; - } - - // otherwise, the numbers are equal - s1 = e1; - s2 = e2; - continue; - } - - if ( n1 ) - return 1; - - if ( n2 ) - return -1; - - if ( *s1 > *s2 ) - return 1; - - if ( *s2 > *s1 ) - return -1; - - s1++; s2++; - } - - if ( *s1 ) - return 1; - if ( *s2 ) - return -1; - return 0; - } - /** A generic pointer type for function arguments. * It will convert from any pointer type except auto_ptr. * Semantics are the same as passing the pointer returned from get() @@ -597,6 +477,8 @@ namespace mongo { T* _p; }; + + /** Hmmmm */ using namespace boost; diff --git a/util/hashtab.h b/util/hashtab.h index 6818bef..f1a3306 100644 --- a/util/hashtab.h +++ b/util/hashtab.h @@ -54,7 +54,7 @@ namespace mongo { } }; void* _buf; - int n; + int n; // number of hashtable buckets int maxChain; Node& nodes(int i) { @@ -156,9 +156,9 @@ namespace mongo { typedef void (*IteratorCallback)( const Key& k , Type& v ); void iterAll( IteratorCallback callback ) { for ( int i=0; i<n; i++ ) { - if ( ! nodes(i).inUse() ) - continue; - callback( nodes(i).k , nodes(i).value ); + if ( nodes(i).inUse() ) { + callback( nodes(i).k , nodes(i).value ); + } } } @@ -166,9 +166,9 @@ namespace mongo { typedef void (*IteratorCallback2)( const Key& k , Type& v , void * extra ); void iterAll( IteratorCallback2 callback , void * extra ) { for ( int i=0; i<n; i++ ) { - if ( ! nodes(i).inUse() ) - continue; - callback( nodes(i).k , nodes(i).value , extra ); + if ( nodes(i).inUse() ) { + callback( nodes(i).k , nodes(i).value , extra ); + } } } diff --git a/util/log.cpp b/util/log.cpp index eb1cbae..bc48584 100644 --- a/util/log.cpp +++ b/util/log.cpp @@ -19,16 +19,18 @@ #include "pch.h" #include "assert_util.h" #include "assert.h" -//#include "file.h" #include <cmath> +#include "time_support.h" using namespace std; -#ifndef _WIN32 -#include <cxxabi.h> -#include <sys/file.h> +#ifdef _WIN32 +# include <io.h> +#else +# include <cxxabi.h> +# include <sys/file.h> #endif -#include "../db/jsobj.h" +//#include "../db/jsobj.h" namespace mongo { @@ -47,6 +49,8 @@ namespace mongo { uassert( 10268 , "LoggingManager already started" , ! _enabled ); _append = append; + bool exists = boost::filesystem::exists(lp); + // test path FILE * test = fopen( lp.c_str() , _append ? "a" : "w" ); if ( ! test ) { @@ -59,6 +63,14 @@ namespace mongo { dbexit( EXIT_BADOPTIONS ); assert( 0 ); } + + if (append && exists){ + // two blank lines before and after + const string msg = "\n\n***** SERVER RESTARTED *****\n\n\n"; + massert(14036, errnoWithPrefix("couldn't write to log file"), + fwrite(msg.data(), 1, msg.size(), test) == msg.size()); + } + fclose( test ); _path = lp; @@ -74,7 +86,7 @@ namespace mongo { if ( _file ) { #ifdef _WIN32 - cout << "log rotation doesn't work on windows" << endl; + cout << "log rotation net yet supported on windows" << endl; return; #else struct tm t; @@ -95,8 +107,21 @@ namespace mongo { assert(0); } +#ifdef _WIN32 // windows has these functions it just gives them a funny name +# define dup2 _dup2 +# define fileno _fileno +#endif + // redirect stderr to log file + dup2(fileno(tmp), 2); + Logstream::setLogFile(tmp); // after this point no thread will be using old file +#if 0 // enable to test redirection + cout << "written to cout" << endl; + cerr << "written to cerr" << endl; + log() << "written to log()" << endl; +#endif + _file = tmp; _opened = time(0); } @@ -125,4 +150,3 @@ namespace mongo { FILE* Logstream::logfile = stdout; } - @@ -46,6 +46,39 @@ namespace mongo { } } + class LabeledLevel { + public: + + LabeledLevel( int level ) : _level( level ) {} + LabeledLevel( const char* label, int level ) : _label( label ), _level( level ) {} + LabeledLevel( const string& label, int level ) : _label( label ), _level( level ) {} + + LabeledLevel operator+( int i ) const { + return LabeledLevel( _label, _level + i ); + } + + LabeledLevel operator+( const char* label ) const { + if( _label == "" ) + return LabeledLevel( label, _level ); + return LabeledLevel( _label + string("::") + label, _level ); + } + + LabeledLevel operator+( string& label ) const { + return LabeledLevel( _label + string("::") + label, _level ); + } + + LabeledLevel operator-( int i ) const { + return LabeledLevel( _label, _level - i ); + } + + const string& getLabel() const { return _label; } + int getLevel() const { return _level; } + + private: + string _label; + int _level; + }; + class LazyString { public: virtual ~LazyString() {} @@ -104,6 +137,9 @@ namespace mongo { virtual Nullstream& operator<<(unsigned) { return *this; } + virtual Nullstream& operator<<(unsigned short) { + return *this; + } virtual Nullstream& operator<<(double) { return *this; } @@ -209,6 +245,7 @@ namespace mongo { Logstream& operator<<(long x) { ss << x; return *this; } Logstream& operator<<(unsigned long x) { ss << x; return *this; } Logstream& operator<<(unsigned x) { ss << x; return *this; } + Logstream& operator<<(unsigned short x){ ss << x; return *this; } Logstream& operator<<(double x) { ss << x; return *this; } Logstream& operator<<(void *x) { ss << x; return *this; } Logstream& operator<<(const void *x) { ss << x; return *this; } @@ -261,6 +298,9 @@ namespace mongo { } public: static Logstream& get() { + if ( StaticObserver::_destroyingStatics ) { + cout << "Logstream::get called in uninitialized state" << endl; + } Logstream *p = tsp.get(); if( p == 0 ) tsp.reset( p = new Logstream() ); @@ -291,7 +331,7 @@ namespace mongo { return Logstream::get(); } - /** logging which we may not want during unit tests runs. + /** logging which we may not want during unit tests (dbtests) runs. set tlogLevel to -1 to suppress tlog() output in a test program. */ inline Nullstream& tlog( int level = 0 ) { if ( level > tlogLevel || level > logLevel ) @@ -305,13 +345,19 @@ namespace mongo { return Logstream::get().prolog(); } -#define MONGO_LOG(level) if ( logLevel >= (level) ) log( level ) +#define MONGO_LOG(level) if ( MONGO_unlikely(logLevel >= (level)) ) log( level ) #define LOG MONGO_LOG inline Nullstream& log( LogLevel l ) { return Logstream::get().prolog().setLogLevel( l ); } + inline Nullstream& log( const LabeledLevel& ll ) { + Nullstream& stream = log( ll.getLevel() ); + if( ll.getLabel() != "" ) + stream << "[" << ll.getLabel() << "] "; + return stream; + } inline Nullstream& log() { return Logstream::get().prolog(); @@ -392,7 +438,6 @@ namespace mongo { string errnoWithPrefix( const char * prefix ); void Logstream::logLockless( const StringData& s ) { - if ( s.size() == 0 ) return; @@ -475,4 +520,6 @@ namespace mongo { } }; + extern Tee* const warnings; // Things put here go in serverStatus + } // namespace mongo diff --git a/util/logfile.cpp b/util/logfile.cpp index 0386a59..609edb8 100644 --- a/util/logfile.cpp +++ b/util/logfile.cpp @@ -77,18 +77,35 @@ namespace mongo { CloseHandle(_fd); } - void LogFile::synchronousAppend(const void *buf, size_t len) { - assert(_fd); - DWORD written; - if( !WriteFile(_fd, buf, len, &written, NULL) ) { - DWORD e = GetLastError(); - if( e == 87 ) - massert(13519, "error appending to file - misaligned direct write?", false); - else - uasserted(13517, str::stream() << "error appending to file " << errnoWithDescription(e)); + void LogFile::truncate() { + verify(15870, _fd != INVALID_HANDLE_VALUE); + + if (!SetEndOfFile(_fd)){ + msgasserted(15871, "Couldn't truncate file: " + errnoWithDescription()); } - else { - dassert( written == len ); + } + + void LogFile::synchronousAppend(const void *_buf, size_t _len) { + const size_t BlockSize = 8 * 1024 * 1024; + assert(_fd); + assert(_len % 4096 == 0); + const char *buf = (const char *) _buf; + size_t left = _len; + while( left ) { + size_t toWrite = min(left, BlockSize); + DWORD written; + if( !WriteFile(_fd, buf, toWrite, &written, NULL) ) { + DWORD e = GetLastError(); + if( e == 87 ) + msgasserted(13519, "error 87 appending to file - invalid parameter"); + else + uasserted(13517, str::stream() << "error appending to file " << _name << ' ' << _len << ' ' << toWrite << ' ' << errnoWithDescription(e)); + } + else { + dassert( written == toWrite ); + } + left -= written; + buf += written; } } @@ -96,28 +113,44 @@ namespace mongo { #else +// posix + #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> +#include "paths.h" namespace mongo { LogFile::LogFile(string name) : _name(name) { - _fd = open(name.c_str(), - O_CREAT - | O_WRONLY + int options = O_CREAT + | O_WRONLY #if defined(O_DIRECT) - | O_DIRECT + | O_DIRECT #endif #if defined(O_NOATIME) - | O_NOATIME + | O_NOATIME +#endif + ; + + _fd = open(name.c_str(), options, S_IRUSR | S_IWUSR); + +#if defined(O_DIRECT) + _direct = true; + if( _fd < 0 ) { + _direct = false; + options &= ~O_DIRECT; + _fd = open(name.c_str(), options, S_IRUSR | S_IWUSR); + } +#else + _direct = false; #endif - , - S_IRUSR | S_IWUSR); + if( _fd < 0 ) { uasserted(13516, str::stream() << "couldn't open file " << name << " for writing " << errnoWithDescription()); } + flushMyDirectory(name); } LogFile::~LogFile() { @@ -126,7 +159,21 @@ namespace mongo { _fd = -1; } + void LogFile::truncate() { + verify(15872, _fd >= 0); + + BOOST_STATIC_ASSERT(sizeof(off_t) == 8); // we don't want overflow here + const off_t pos = lseek(_fd, 0, SEEK_CUR); // doesn't actually seek + if (ftruncate(_fd, pos) != 0){ + msgasserted(15873, "Couldn't truncate file: " + errnoWithDescription()); + } + } + void LogFile::synchronousAppend(const void *b, size_t len) { +#ifdef POSIX_FADV_DONTNEED + const off_t pos = lseek(_fd, 0, SEEK_CUR); // doesn't actually seek +#endif + const char *buf = (char *) b; assert(_fd); assert(((size_t)buf)%4096==0); // aligned @@ -150,6 +197,10 @@ namespace mongo { uasserted(13514, str::stream() << "error appending to file on fsync " << ' ' << errnoWithDescription()); } +#ifdef POSIX_FADV_DONTNEED + if (!_direct) + posix_fadvise(_fd, pos, len, POSIX_FADV_DONTNEED); +#endif } } diff --git a/util/logfile.h b/util/logfile.h index 9085161..f6d1c94 100644 --- a/util/logfile.h +++ b/util/logfile.h @@ -38,6 +38,8 @@ namespace mongo { const string _name; + void truncate(); // Removes extra data after current position + private: #if defined(_WIN32) typedef HANDLE fd_type; @@ -45,6 +47,7 @@ namespace mongo { typedef int fd_type; #endif fd_type _fd; + bool _direct; // are we using direct I/O }; } diff --git a/util/message.cpp b/util/message.cpp deleted file mode 100644 index bcb1772..0000000 --- a/util/message.cpp +++ /dev/null @@ -1,764 +0,0 @@ -/* message - - todo: authenticate; encrypt? -*/ - -/* Copyright 2009 10gen Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "pch.h" -#include "message.h" -#include <time.h> -#include "../util/goodies.h" -#include "../util/background.h" -#include <fcntl.h> -#include <errno.h> -#include "../db/cmdline.h" -#include "../client/dbclient.h" -#include "../util/time_support.h" - -#ifndef _WIN32 -# ifndef __sunos__ -# include <ifaddrs.h> -# endif -# include <sys/resource.h> -# include <sys/stat.h> -#else - -// errno doesn't work for winsock. -#undef errno -#define errno WSAGetLastError() - -#endif - -namespace mongo { - - bool noUnixSocket = false; - - bool objcheck = false; - - void checkTicketNumbers(); - -// if you want trace output: -#define mmm(x) - -#ifdef MSG_NOSIGNAL - const int portSendFlags = MSG_NOSIGNAL; - const int portRecvFlags = MSG_NOSIGNAL; -#else - const int portSendFlags = 0; - const int portRecvFlags = 0; -#endif - - const Listener* Listener::_timeTracker; - - string SocketException::toString() const { - stringstream ss; - ss << _ei.code << " socket exception [" << _type << "] "; - - if ( _server.size() ) - ss << "server [" << _server << "] "; - - if ( _extra.size() ) - ss << _extra; - - return ss.str(); - } - - - vector<SockAddr> ipToAddrs(const char* ips, int port) { - vector<SockAddr> out; - if (*ips == '\0') { - out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all - - if (IPv6Enabled()) - out.push_back(SockAddr("::", port)); // IPv6 all -#ifndef _WIN32 - if (!noUnixSocket) - out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket -#endif - return out; - } - - while(*ips) { - string ip; - const char * comma = strchr(ips, ','); - if (comma) { - ip = string(ips, comma - ips); - ips = comma + 1; - } - else { - ip = string(ips); - ips = ""; - } - - SockAddr sa(ip.c_str(), port); - out.push_back(sa); - -#ifndef _WIN32 - if (!noUnixSocket && (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4 - out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); -#endif - } - return out; - - } - - /* listener ------------------------------------------------------------------- */ - - void Listener::initAndListen() { - checkTicketNumbers(); - vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port); - vector<int> socks; - SOCKET maxfd = 0; // needed for select() - - for (vector<SockAddr>::iterator it=mine.begin(), end=mine.end(); it != end; ++it) { - SockAddr& me = *it; - - SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0); - if ( sock == INVALID_SOCKET ) { - log() << "ERROR: listen(): invalid socket? " << errnoWithDescription() << endl; - } - - if (me.getType() == AF_UNIX) { -#if !defined(_WIN32) - if (unlink(me.getAddr().c_str()) == -1) { - int x = errno; - if (x != ENOENT) { - log() << "couldn't unlink socket file " << me << errnoWithDescription(x) << " skipping" << endl; - continue; - } - } -#endif - } - else if (me.getType() == AF_INET6) { - // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1) - // That causes a conflict if we don't do set it to IPV6_ONLY - const int one = 1; - setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &one, sizeof(one)); - } - - prebindOptions( sock ); - - if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) { - int x = errno; - log() << "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl; - if ( x == EADDRINUSE ) - log() << " addr already in use" << endl; - closesocket(sock); - return; - } - -#if !defined(_WIN32) - if (me.getType() == AF_UNIX) { - if (chmod(me.getAddr().c_str(), 0777) == -1) { - log() << "couldn't chmod socket file " << me << errnoWithDescription() << endl; - } - - ListeningSockets::get()->addPath( me.getAddr() ); - } -#endif - - if ( ::listen(sock, 128) != 0 ) { - log() << "listen(): listen() failed " << errnoWithDescription() << endl; - closesocket(sock); - return; - } - - ListeningSockets::get()->add( sock ); - - socks.push_back(sock); - if (sock > maxfd) - maxfd = sock; - } - - static long connNumber = 0; - struct timeval maxSelectTime; - while ( ! inShutdown() ) { - fd_set fds[1]; - FD_ZERO(fds); - - for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) { - FD_SET(*it, fds); - } - - maxSelectTime.tv_sec = 0; - maxSelectTime.tv_usec = 10000; - const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime); - - if (ret == 0) { -#if defined(__linux__) - _elapsedTime += ( 10000 - maxSelectTime.tv_usec ) / 1000; -#else - _elapsedTime += 10; -#endif - continue; - } - _elapsedTime += ret; // assume 1ms to grab connection. very rough - - if (ret < 0) { - int x = errno; -#ifdef EINTR - if ( x == EINTR ) { - log() << "select() signal caught, continuing" << endl; - continue; - } -#endif - if ( ! inShutdown() ) - log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl; - return; - } - - for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) { - if (! (FD_ISSET(*it, fds))) - continue; - - SockAddr from; - int s = accept(*it, from.raw(), &from.addressSize); - if ( s < 0 ) { - int x = errno; // so no global issues - if ( x == ECONNABORTED || x == EBADF ) { - log() << "Listener on port " << _port << " aborted" << endl; - return; - } - if ( x == 0 && inShutdown() ) { - return; // socket closed - } - if( !inShutdown() ) - log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl; - continue; - } - if (from.getType() != AF_UNIX) - disableNagle(s); - if ( _logConnect && ! cmdLine.quiet ) - log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl; - accepted(s, from); - } - } - } - - void Listener::accepted(int sock, const SockAddr& from) { - accepted( new MessagingPort(sock, from) ); - } - - /* messagingport -------------------------------------------------------------- */ - - class PiggyBackData { - public: - PiggyBackData( MessagingPort * port ) { - _port = port; - _buf = new char[1300]; - _cur = _buf; - } - - ~PiggyBackData() { - DESTRUCTOR_GUARD ( - flush(); - delete[]( _cur ); - ); - } - - void append( Message& m ) { - assert( m.header()->len <= 1300 ); - - if ( len() + m.header()->len > 1300 ) - flush(); - - memcpy( _cur , m.singleData() , m.header()->len ); - _cur += m.header()->len; - } - - void flush() { - if ( _buf == _cur ) - return; - - _port->send( _buf , len(), "flush" ); - _cur = _buf; - } - - int len() const { return _cur - _buf; } - - private: - MessagingPort* _port; - char * _buf; - char * _cur; - }; - - class Ports { - set<MessagingPort*> ports; - mongo::mutex m; - public: - Ports() : ports(), m("Ports") {} - void closeAll(unsigned skip_mask) { - scoped_lock bl(m); - for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) { - if( (*i)->tag & skip_mask ) - continue; - (*i)->shutdown(); - } - } - void insert(MessagingPort* p) { - scoped_lock bl(m); - ports.insert(p); - } - void erase(MessagingPort* p) { - scoped_lock bl(m); - ports.erase(p); - } - }; - - // we "new" this so it is still be around when other automatic global vars - // are being destructed during termination. - Ports& ports = *(new Ports()); - - void MessagingPort::closeAllSockets(unsigned mask) { - ports.closeAll(mask); - } - - MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), _bytesIn(0), _bytesOut(0), farEnd(_far), _timeout(), tag(0) { - _logLevel = 0; - ports.insert(this); - } - - MessagingPort::MessagingPort( double timeout, int ll ) : _bytesIn(0), _bytesOut(0), tag(0) { - _logLevel = ll; - ports.insert(this); - sock = -1; - piggyBackData = 0; - _timeout = timeout; - } - - void MessagingPort::shutdown() { - if ( sock >= 0 ) { - closesocket(sock); - sock = -1; - } - } - - MessagingPort::~MessagingPort() { - if ( piggyBackData ) - delete( piggyBackData ); - shutdown(); - ports.erase(this); - } - - class ConnectBG : public BackgroundJob { - public: - ConnectBG(int sock, SockAddr farEnd) : _sock(sock), _farEnd(farEnd) { } - - void run() { _res = ::connect(_sock, _farEnd.raw(), _farEnd.addressSize); } - string name() const { return "ConnectBG"; } - int inError() const { return _res; } - - private: - int _sock; - int _res; - SockAddr _farEnd; - }; - - bool MessagingPort::connect(SockAddr& _far) { - farEnd = _far; - - sock = socket(farEnd.getType(), SOCK_STREAM, 0); - if ( sock == INVALID_SOCKET ) { - log(_logLevel) << "ERROR: connect invalid socket " << errnoWithDescription() << endl; - return false; - } - - if ( _timeout > 0 ) { - setSockTimeouts( sock, _timeout ); - } - - ConnectBG bg(sock, farEnd); - bg.go(); - if ( bg.wait(5000) ) { - if ( bg.inError() ) { - closesocket(sock); - sock = -1; - return false; - } - } - else { - // time out the connect - closesocket(sock); - sock = -1; - bg.wait(); // so bg stays in scope until bg thread terminates - return false; - } - - if (farEnd.getType() != AF_UNIX) - disableNagle(sock); - -#ifdef SO_NOSIGPIPE - // osx - const int one = 1; - setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int)); -#endif - - /* - // SO_LINGER is bad - #ifdef SO_LINGER - struct linger ling; - ling.l_onoff = 1; - ling.l_linger = 0; - setsockopt(sock, SOL_SOCKET, SO_LINGER, (char *) &ling, sizeof(ling)); - #endif - */ - return true; - } - - bool MessagingPort::recv(Message& m) { - try { -again: - mmm( log() << "* recv() sock:" << this->sock << endl; ) - int len = -1; - - char *lenbuf = (char *) &len; - int lft = 4; - recv( lenbuf, lft ); - - if ( len < 16 || len > 48000000 ) { // messages must be large enough for headers - if ( len == -1 ) { - // Endian check from the client, after connecting, to see what mode server is running in. - unsigned foo = 0x10203040; - send( (char *) &foo, 4, "endian" ); - goto again; - } - - if ( len == 542393671 ) { - // an http GET - log(_logLevel) << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl; - string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n"; - stringstream ss; - ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; - string s = ss.str(); - send( s.c_str(), s.size(), "http" ); - return false; - } - log(0) << "recv(): message len " << len << " is too large" << len << endl; - return false; - } - - int z = (len+1023)&0xfffffc00; - assert(z>=len); - MsgData *md = (MsgData *) malloc(z); - assert(md); - md->len = len; - - char *p = (char *) &md->id; - int left = len -4; - - try { - recv( p, left ); - } - catch (...) { - free(md); - throw; - } - - _bytesIn += len; - m.setData(md, true); - return true; - - } - catch ( const SocketException & e ) { - log(_logLevel + (e.shouldPrint() ? 0 : 1) ) << "SocketException: remote: " << remote() << " error: " << e << endl; - m.reset(); - return false; - } - } - - void MessagingPort::reply(Message& received, Message& response) { - say(/*received.from, */response, received.header()->id); - } - - void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) { - say(/*received.from, */response, responseTo); - } - - bool MessagingPort::call(Message& toSend, Message& response) { - mmm( log() << "*call()" << endl; ) - say(toSend); - return recv( toSend , response ); - } - - bool MessagingPort::recv( const Message& toSend , Message& response ) { - while ( 1 ) { - bool ok = recv(response); - if ( !ok ) - return false; - //log() << "got response: " << response.data->responseTo << endl; - if ( response.header()->responseTo == toSend.header()->id ) - break; - error() << "MessagingPort::call() wrong id got:" << hex << (unsigned)response.header()->responseTo << " expect:" << (unsigned)toSend.header()->id << '\n' - << dec - << " toSend op: " << (unsigned)toSend.operation() << '\n' - << " response msgid:" << (unsigned)response.header()->id << '\n' - << " response len: " << (unsigned)response.header()->len << '\n' - << " response op: " << response.operation() << '\n' - << " farEnd: " << farEnd << endl; - assert(false); - response.reset(); - } - mmm( log() << "*call() end" << endl; ) - return true; - } - - void MessagingPort::say(Message& toSend, int responseTo) { - assert( !toSend.empty() ); - mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) - toSend.header()->id = nextMessageId(); - toSend.header()->responseTo = responseTo; - - if ( piggyBackData && piggyBackData->len() ) { - mmm( log() << "* have piggy back" << endl; ) - if ( ( piggyBackData->len() + toSend.header()->len ) > 1300 ) { - // won't fit in a packet - so just send it off - piggyBackData->flush(); - } - else { - piggyBackData->append( toSend ); - piggyBackData->flush(); - return; - } - } - - toSend.send( *this, "say" ); - } - - // sends all data or throws an exception - void MessagingPort::send( const char * data , int len, const char *context ) { - _bytesOut += len; - while( len > 0 ) { - int ret = ::send( sock , data , len , portSendFlags ); - if ( ret == -1 ) { - if ( errno != EAGAIN || _timeout == 0 ) { - SocketException::Type t = SocketException::SEND_ERROR; -#if defined(_WINDOWS) - if( e == WSAETIMEDOUT ) t = SocketException::SEND_TIMEOUT; -#endif - log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl; - throw SocketException( t ); - } - else { - if ( !serverAlive( farEnd.toString() ) ) { - log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl; - throw SocketException( SocketException::SEND_ERROR ); - } - } - } - else { - assert( ret <= len ); - len -= ret; - data += ret; - } - } - } - - // sends all data or throws an exception - void MessagingPort::send( const vector< pair< char *, int > > &data, const char *context ) { -#if defined(_WIN32) - // TODO use scatter/gather api - for( vector< pair< char *, int > >::const_iterator i = data.begin(); i != data.end(); ++i ) { - char * data = i->first; - int len = i->second; - send( data, len, context ); - } -#else - vector< struct iovec > d( data.size() ); - int i = 0; - for( vector< pair< char *, int > >::const_iterator j = data.begin(); j != data.end(); ++j ) { - if ( j->second > 0 ) { - d[ i ].iov_base = j->first; - d[ i ].iov_len = j->second; - ++i; - } - } - struct msghdr meta; - memset( &meta, 0, sizeof( meta ) ); - meta.msg_iov = &d[ 0 ]; - meta.msg_iovlen = d.size(); - - while( meta.msg_iovlen > 0 ) { - int ret = ::sendmsg( sock , &meta , portSendFlags ); - if ( ret == -1 ) { - if ( errno != EAGAIN || _timeout == 0 ) { - log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl; - throw SocketException( SocketException::SEND_ERROR ); - } - else { - if ( !serverAlive( farEnd.toString() ) ) { - log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl; - throw SocketException( SocketException::SEND_ERROR ); - } - } - } - else { - struct iovec *& i = meta.msg_iov; - while( ret > 0 ) { - if ( i->iov_len > unsigned( ret ) ) { - i->iov_len -= ret; - i->iov_base = (char*)(i->iov_base) + ret; - ret = 0; - } - else { - ret -= i->iov_len; - ++i; - --(meta.msg_iovlen); - } - } - } - } -#endif - } - - void MessagingPort::recv( char * buf , int len ) { - unsigned retries = 0; - while( len > 0 ) { - int ret = ::recv( sock , buf , len , portRecvFlags ); - if ( ret > 0 ) { - if ( len <= 4 && ret != len ) - log(_logLevel) << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl; - assert( ret <= len ); - len -= ret; - buf += ret; - } - else if ( ret == 0 ) { - log(3) << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; - throw SocketException( SocketException::CLOSED ); - } - else { /* ret < 0 */ - int e = errno; - -#if defined(EINTR) && !defined(_WIN32) - if( e == EINTR ) { - if( ++retries == 1 ) { - log() << "EINTR retry" << endl; - continue; - } - } -#endif - if ( ( e == EAGAIN -#ifdef _WINDOWS - || e == WSAETIMEDOUT -#endif - ) && _timeout > 0 ) { - // this is a timeout - log(_logLevel) << "MessagingPort recv() timeout " << farEnd.toString() <<endl; - throw SocketException(SocketException::RECV_TIMEOUT); - } - - log(_logLevel) << "MessagingPort recv() " << errnoWithDescription(e) << " " << farEnd.toString() <<endl; - throw SocketException(SocketException::RECV_ERROR); - } - } - } - - int MessagingPort::unsafe_recv( char *buf, int max ) { - return ::recv( sock , buf , max , portRecvFlags ); - } - - void MessagingPort::piggyBack( Message& toSend , int responseTo ) { - - if ( toSend.header()->len > 1300 ) { - // not worth saving because its almost an entire packet - say( toSend ); - return; - } - - // we're going to be storing this, so need to set it up - toSend.header()->id = nextMessageId(); - toSend.header()->responseTo = responseTo; - - if ( ! piggyBackData ) - piggyBackData = new PiggyBackData( this ); - - piggyBackData->append( toSend ); - } - - unsigned MessagingPort::remotePort() const { - return farEnd.getPort(); - } - - HostAndPort MessagingPort::remote() const { - if ( _farEndParsed.port() == -1 ) - _farEndParsed = HostAndPort( farEnd ); - return _farEndParsed; - } - - - MSGID NextMsgId; - - struct MsgStart { - MsgStart() { - NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); - assert(MsgDataHeaderSize == 16); - } - } msgstart; - - MSGID nextMessageId() { - MSGID msgid = NextMsgId++; - return msgid; - } - - bool doesOpGetAResponse( int op ) { - return op == dbQuery || op == dbGetMore; - } - - const int DEFAULT_MAX_CONN = 20000; - const int MAX_MAX_CONN = 20000; - - int getMaxConnections() { -#ifdef _WIN32 - return DEFAULT_MAX_CONN; -#else - struct rlimit limit; - assert( getrlimit(RLIMIT_NOFILE,&limit) == 0 ); - - int max = (int)(limit.rlim_cur * .8); - - log(1) << "fd limit" - << " hard:" << limit.rlim_max - << " soft:" << limit.rlim_cur - << " max conn: " << max - << endl; - - if ( max > MAX_MAX_CONN ) - max = MAX_MAX_CONN; - - return max; -#endif - } - - void checkTicketNumbers() { - int want = getMaxConnections(); - int current = connTicketHolder.outof(); - if ( current != DEFAULT_MAX_CONN ) { - if ( current < want ) { - // they want fewer than they can handle - // which is fine - log(1) << " only allowing " << current << " connections" << endl; - return; - } - if ( current > want ) { - log() << " --maxConns too high, can only handle " << want << endl; - } - } - connTicketHolder.resize( want ); - } - - TicketHolder connTicketHolder(DEFAULT_MAX_CONN); - -} // namespace mongo diff --git a/util/mmap.cpp b/util/mmap.cpp index 18edc34..fa9ab73 100644 --- a/util/mmap.cpp +++ b/util/mmap.cpp @@ -20,6 +20,7 @@ #include "processinfo.h" #include "concurrency/rwlock.h" #include "../db/namespace.h" +#include "../db/cmdline.h" namespace mongo { @@ -61,7 +62,7 @@ namespace mongo { this is the administrative stuff */ - RWLock MongoFile::mmmutex("rw:mmmutex"); + RWLockRecursive MongoFile::mmmutex("mmmutex",10*60*1000 /* 10 minutes */); /* subclass must call in destructor (or at close). removes this from pathToFile and other maps @@ -69,7 +70,7 @@ namespace mongo { ideal to call close to the close, if the close is well before object destruction */ void MongoFile::destroyed() { - rwlock lk( mmmutex , true ); + mmmutex.assertExclusivelyLocked(); mmfiles.erase(this); pathToFile.erase( filename() ); } @@ -83,11 +84,12 @@ namespace mongo { } ++closingAllFiles; - rwlock lk( mmmutex , true ); + RWLockRecursive::Exclusive lk(mmmutex); ProgressMeter pm( mmfiles.size() , 2 , 1 ); - for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) { - (*i)->close(); + set<MongoFile*> temp = mmfiles; + for ( set<MongoFile*>::iterator i = temp.begin(); i != temp.end(); i++ ) { + (*i)->close(); // close() now removes from mmfiles pm.hit(); } message << "closeAllFiles() finished"; @@ -97,7 +99,7 @@ namespace mongo { /*static*/ long long MongoFile::totalMappedLength() { unsigned long long total = 0; - rwlock lk( mmmutex , false ); + RWLockRecursive::Shared lk(mmmutex); for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) total += (*i)->length(); @@ -120,7 +122,7 @@ namespace mongo { /*static*/ int MongoFile::_flushAll( bool sync ) { if ( ! sync ) { int num = 0; - rwlock lk( mmmutex , false ); + RWLockRecursive::Shared lk(mmmutex); for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) { num++; MongoFile * mmf = *i; @@ -137,7 +139,7 @@ namespace mongo { while ( true ) { auto_ptr<Flushable> f; { - rwlock lk( mmmutex , false ); + RWLockRecursive::Shared lk(mmmutex); for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) { MongoFile * mmf = *i; if ( ! mmf ) @@ -158,12 +160,12 @@ namespace mongo { } void MongoFile::created() { - rwlock lk( mmmutex , true ); + RWLockRecursive::Exclusive lk(mmmutex); mmfiles.insert(this); } void MongoFile::setFilename(string fn) { - rwlock( mmmutex, true ); + RWLockRecursive::Exclusive lk(mmmutex); assert( _filename.empty() ); _filename = fn; MongoFile *&ptf = pathToFile[fn]; @@ -173,7 +175,9 @@ namespace mongo { #if defined(_DEBUG) void MongoFile::markAllWritable() { - rwlock lk( mmmutex , false ); + if( cmdLine.dur ) + return; + RWLockRecursive::Shared lk(mmmutex); for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) { MongoFile * mmf = *i; if (mmf) mmf->_lock(); @@ -181,7 +185,9 @@ namespace mongo { } void MongoFile::unmarkAllWritable() { - rwlock lk( mmmutex , false ); + if( cmdLine.dur ) + return; + RWLockRecursive::Shared lk(mmmutex); for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) { MongoFile * mmf = *i; if (mmf) mmf->_unlock(); diff --git a/util/mmap.h b/util/mmap.h index 2ef4176..c1b14bb 100644 --- a/util/mmap.h +++ b/util/mmap.h @@ -21,6 +21,15 @@ namespace mongo { + class MAdvise { + void *_p; + unsigned _len; + public: + enum Advice { Sequential=1 }; + MAdvise(void *p, unsigned len, Advice a); + ~MAdvise(); // destructor resets the range to MADV_NORMAL + }; + /* the administrative-ish stuff here */ class MongoFile : boost::noncopyable { public: @@ -44,7 +53,8 @@ namespace mongo { template < class F > static void forEach( F fun ); - /** note: you need to be in mmmutex when using this. forEach (above) handles that for you automatically. */ + /** note: you need to be in mmmutex when using this. forEach (above) handles that for you automatically. +*/ static set<MongoFile*>& getAllFiles() { return mmfiles; } // callbacks if you need them @@ -100,7 +110,9 @@ namespace mongo { static set<MongoFile*> mmfiles; public: static map<string,MongoFile*> pathToFile; - static RWLock mmmutex; + + // lock order: lock dbMutex before this if you lock both + static RWLockRecursive mmmutex; }; /** look up a MMF by filename. scoped mutex locking convention. @@ -111,7 +123,7 @@ namespace mongo { */ class MongoFileFinder : boost::noncopyable { public: - MongoFileFinder() : _lk(MongoFile::mmmutex,false) { } + MongoFileFinder() : _lk(MongoFile::mmmutex) { } /** @return The MongoFile object associated with the specified file name. If no file is open with the specified name, returns null. @@ -122,7 +134,7 @@ namespace mongo { } private: - rwlock _lk; + RWLockRecursive::Shared _lk; }; struct MongoFileAllowWrites { @@ -135,11 +147,18 @@ namespace mongo { }; class MemoryMappedFile : public MongoFile { + protected: + virtual void* viewForFlushing() { + if( views.size() == 0 ) + return 0; + assert( views.size() == 1 ); + return views[0]; + } public: MemoryMappedFile(); virtual ~MemoryMappedFile() { - destroyed(); // cleans up from the master list of mmaps + RWLockRecursive::Exclusive lk(mmmutex); close(); } @@ -147,6 +166,8 @@ namespace mongo { // Throws exception if file doesn't exist. (dm may2010: not sure if this is always true?) void* map(const char *filename); + + /** @param options see MongoFile::Options */ void* mapWithOptions(const char *filename, int options); /* Creates with length if DNE, otherwise uses existing file length, @@ -187,7 +208,7 @@ namespace mongo { HANDLE maphandle; vector<void *> views; unsigned long long len; - + #ifdef _WIN32 boost::shared_ptr<mutex> _flushMutex; void clearWritableBits(void *privateView); @@ -212,7 +233,7 @@ namespace mongo { /** p is called from within a mutex that MongoFile uses. so be careful not to deadlock. */ template < class F > inline void MongoFile::forEach( F p ) { - rwlock lk( mmmutex , false ); + RWLockRecursive::Shared lklk(mmmutex); for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) p(*i); } @@ -227,10 +248,11 @@ namespace mongo { bool get(unsigned i) const { unsigned x = i / 32; assert( x < MemoryMappedFile::NChunks ); - return bits[x] & (1 << (i%32)); + return (bits[x] & (1 << (i%32))) != 0; } void set(unsigned i) { unsigned x = i / 32; + wassert( x < (MemoryMappedFile::NChunks*2/3) ); // warn if getting close to limit assert( x < MemoryMappedFile::NChunks ); bits[x] |= (1 << (i%32)); } diff --git a/util/mmap_posix.cpp b/util/mmap_posix.cpp index f47a06f..5b5e86d 100644 --- a/util/mmap_posix.cpp +++ b/util/mmap_posix.cpp @@ -19,13 +19,12 @@ #include "mmap.h" #include "file_allocator.h" #include "../db/concurrency.h" - #include <errno.h> #include <sys/mman.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> - +#include "../util/processinfo.h" #include "mongoutils/str.h" using namespace mongoutils; @@ -39,6 +38,7 @@ namespace mongo { } void MemoryMappedFile::close() { + mmmutex.assertExclusivelyLocked(); for( vector<void*>::iterator i = views.begin(); i != views.end(); i++ ) { munmap(*i,len); } @@ -47,6 +47,7 @@ namespace mongo { if ( fd ) ::close(fd); fd = 0; + destroyed(); // cleans up from the master list of mmaps } #ifndef O_NOATIME @@ -57,6 +58,19 @@ namespace mongo { #define MAP_NORESERVE (0) #endif +#if defined(__sunos__) + MAdvise::MAdvise(void *,unsigned, Advice) { } + MAdvise::~MAdvise() { } +#else + MAdvise::MAdvise(void *p, unsigned len, Advice a) : _p(p), _len(len) { + assert( a == Sequential ); // more later + madvise(_p,_len,MADV_SEQUENTIAL); + } + MAdvise::~MAdvise() { + madvise(_p,_len,MADV_NORMAL); + } +#endif + void* MemoryMappedFile::map(const char *filename, unsigned long long &length, int options) { // length may be updated by callee. setFilename(filename); @@ -68,6 +82,7 @@ namespace mongo { fd = open(filename, O_RDWR | O_NOATIME); if ( fd <= 0 ) { log() << "couldn't open " << filename << ' ' << errnoWithDescription() << endl; + fd = 0; // our sentinel for not opened return 0; } @@ -149,6 +164,7 @@ namespace mongo { int err = errno; error() << "13601 Couldn't remap private view: " << errnoWithDescription(err) << endl; log() << "aborting" << endl; + printMemInfo(); abort(); } assert( x == oldPrivateAddr ); @@ -158,7 +174,7 @@ namespace mongo { void MemoryMappedFile::flush(bool sync) { if ( views.empty() || fd == 0 ) return; - if ( msync(views[0], len, sync ? MS_SYNC : MS_ASYNC) ) + if ( msync(viewForFlushing(), len, sync ? MS_SYNC : MS_ASYNC) ) problem() << "msync " << errnoWithDescription() << endl; } @@ -181,7 +197,7 @@ namespace mongo { }; MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush() { - return new PosixFlushable( views.empty() ? 0 : views[0] , fd , len ); + return new PosixFlushable( viewForFlushing() , fd , len ); } void MemoryMappedFile::_lock() { diff --git a/util/mmap_win.cpp b/util/mmap_win.cpp index 0b0b834..9173d7b 100644 --- a/util/mmap_win.cpp +++ b/util/mmap_win.cpp @@ -18,7 +18,6 @@ #include "pch.h" #include "mmap.h" #include "text.h" -#include <windows.h> #include "../db/mongommf.h" #include "../db/concurrency.h" @@ -27,6 +26,9 @@ namespace mongo { mutex mapViewMutex("mapView"); ourbitset writable; + MAdvise::MAdvise(void *,unsigned, Advice) { } + MAdvise::~MAdvise() { } + /** notification on unmapping so we can clear writable bits */ void MemoryMappedFile::clearWritableBits(void *p) { for( unsigned i = ((size_t)p)/ChunkSize; i <= (((size_t)p)+len)/ChunkSize; i++ ) { @@ -44,6 +46,7 @@ namespace mongo { } void MemoryMappedFile::close() { + mmmutex.assertExclusivelyLocked(); for( vector<void*>::iterator i = views.begin(); i != views.end(); i++ ) { clearWritableBits(*i); UnmapViewOfFile(*i); @@ -55,6 +58,7 @@ namespace mongo { if ( fd ) CloseHandle(fd); fd = 0; + destroyed(); // cleans up from the master list of mmaps } unsigned long long mapped = 0; @@ -138,7 +142,8 @@ namespace mongo { } if ( view == 0 ) { DWORD e = GetLastError(); - log() << "MapViewOfFile failed " << filename << " " << errnoWithDescription(e) << endl; + log() << "MapViewOfFile failed " << filename << " " << errnoWithDescription(e) << + ((sizeof(void*)==4)?" (32 bit build)":"") << endl; close(); } else { @@ -183,13 +188,13 @@ namespace mongo { void MemoryMappedFile::flush(bool sync) { uassert(13056, "Async flushing not supported on windows", sync); if( !views.empty() ) { - WindowsFlushable f( views[0] , fd , filename() , _flushMutex); + WindowsFlushable f( viewForFlushing() , fd , filename() , _flushMutex); f.flush(); } } MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush() { - return new WindowsFlushable( views.empty() ? 0 : views[0] , fd , filename() , _flushMutex ); + return new WindowsFlushable( viewForFlushing() , fd , filename() , _flushMutex ); } void MemoryMappedFile::_lock() {} void MemoryMappedFile::_unlock() {} diff --git a/util/mongoutils/README b/util/mongoutils/README index fd2a589..f61277c 100755 --- a/util/mongoutils/README +++ b/util/mongoutils/README @@ -11,3 +11,5 @@ So basically, easy to use, general purpose stuff, with no arduous dependencies to drop into any new project. + + *** PLACE UNIT TESTS IN mongoutils/test.cpp *** diff --git a/util/mongoutils/str.h b/util/mongoutils/str.h index ea8f938..57b94fa 100644 --- a/util/mongoutils/str.h +++ b/util/mongoutils/str.h @@ -29,6 +29,8 @@ #include <string> #include <sstream> + +// this violates the README rules for mongoutils: #include "../../bson/util/builder.h" namespace mongoutils { @@ -48,13 +50,11 @@ namespace mongoutils { class stream { public: mongo::StringBuilder ss; - template<class T> stream& operator<<(const T& v) { ss << v; return *this; } - operator std::string () const { return ss.str(); } }; @@ -106,13 +106,13 @@ namespace mongoutils { return strchr(s.c_str(), x) != 0; } - /** @return everything befor the character x, else entire string */ + /** @return everything before the character x, else entire string */ inline string before(const string& s, char x) { const char *p = strchr(s.c_str(), x); return (p != 0) ? s.substr(0, p-s.c_str()) : s; } - /** @return everything befor the string x, else entire string */ + /** @return everything before the string x, else entire string */ inline string before(const string& s, const string& x) { const char *p = strstr(s.c_str(), x.c_str()); return (p != 0) ? s.substr(0, p-s.c_str()) : s; diff --git a/util/mongoutils/test.cpp b/util/mongoutils/test.cpp index d8ee461..45268c5 100644 --- a/util/mongoutils/test.cpp +++ b/util/mongoutils/test.cpp @@ -19,9 +19,9 @@ * limitations under the License. */ +#include <assert.h> #include "str.h" #include "html.h" -#include <assert.h> using namespace std; using namespace mongoutils; diff --git a/util/hostandport.h b/util/net/hostandport.h index fd27296..573e8ee 100644 --- a/util/hostandport.h +++ b/util/net/hostandport.h @@ -18,8 +18,8 @@ #pragma once #include "sock.h" -#include "../db/cmdline.h" -#include "mongoutils/str.h" +#include "../../db/cmdline.h" +#include "../mongoutils/str.h" namespace mongo { @@ -70,8 +70,10 @@ namespace mongo { bool isLocalHost() const; - // @returns host:port - string toString() const; + /** + * @param includePort host:port if true, host otherwise + */ + string toString( bool includePort=true ) const; operator string() const { return toString(); } @@ -87,24 +89,6 @@ namespace mongo { int _port; // -1 indicates unspecified }; - /** returns true if strings seem to be the same hostname. - "nyc1", "nyc1.acme", and "nyc1.acme.com" are treated as the same. - */ - inline bool sameHostname(const string& a, const string& b) { - size_t prefixLen = str::shareCommonPrefix(a.c_str(), b.c_str()); - - if (prefixLen == a.size()) { // (a == b) or (a isPrefixOf b) - if ( b[prefixLen] == '.' || b[prefixLen] == '\0') - return true; - } - else if(prefixLen == b.size()) { // (b isPrefixOf a) - if ( a[prefixLen] == '.') // can't be '\0' - return true; - } - - return false; - } - inline HostAndPort HostAndPort::Me() { const char* ips = cmdLine.bind_ip.c_str(); while(*ips) { @@ -130,7 +114,10 @@ namespace mongo { return HostAndPort(h, cmdLine.port); } - inline string HostAndPort::toString() const { + inline string HostAndPort::toString( bool includePort ) const { + if ( ! includePort ) + return _host; + stringstream ss; ss << _host; if ( _port != -1 ) { @@ -150,7 +137,12 @@ namespace mongo { } inline bool HostAndPort::isLocalHost() const { - return _host == "localhost" || startsWith(_host.c_str(), "127.") || _host == "::1"; + return ( _host == "localhost" + || startsWith(_host.c_str(), "127.") + || _host == "::1" + || _host == "anonymous unix socket" + || _host.c_str()[0] == '/' // unix socket + ); } inline HostAndPort::HostAndPort(string s) { diff --git a/util/httpclient.cpp b/util/net/httpclient.cpp index 61d5671..16eaa0a 100644 --- a/util/httpclient.cpp +++ b/util/net/httpclient.cpp @@ -19,7 +19,9 @@ #include "httpclient.h" #include "sock.h" #include "message.h" -#include "../bson/util/builder.h" +#include "message_port.h" +#include "../mongoutils/str.h" +#include "../../bson/util/builder.h" namespace mongo { @@ -36,8 +38,15 @@ namespace mongo { } int HttpClient::_go( const char * command , string url , const char * body , Result * result ) { - uassert( 10271 , "invalid url" , url.find( "http://" ) == 0 ); - url = url.substr( 7 ); + bool ssl = false; + if ( url.find( "https://" ) == 0 ) { + ssl = true; + url = url.substr( 8 ); + } + else { + uassert( 10271 , "invalid url" , url.find( "http://" ) == 0 ); + url = url.substr( 7 ); + } string host , path; if ( url.find( "/" ) == string::npos ) { @@ -54,7 +63,7 @@ namespace mongo { HD( "path [" << path << "]" ); string server = host; - int port = 80; + int port = ssl ? 443 : 80; string::size_type idx = host.find( ":" ); if ( idx != string::npos ) { @@ -87,18 +96,27 @@ namespace mongo { SockAddr addr( server.c_str() , port ); HD( "addr: " << addr.toString() ); - MessagingPort p; - if ( ! p.connect( addr ) ) + Socket sock; + if ( ! sock.connect( addr ) ) return -1; + + if ( ssl ) { +#ifdef MONGO_SSL + _checkSSLManager(); + sock.secure( _sslManager.get() ); +#else + uasserted( 15862 , "no ssl support" ); +#endif + } { const char * out = req.c_str(); int toSend = req.size(); - p.send( out , toSend, "_go" ); + sock.send( out , toSend, "_go" ); } char buf[4096]; - int got = p.unsafe_recv( buf , 4096 ); + int got = sock.unsafe_recv( buf , 4096 ); buf[got] = 0; int rc; @@ -110,7 +128,7 @@ namespace mongo { if ( result ) sb << buf; - while ( ( got = p.unsafe_recv( buf , 4096 ) ) > 0) { + while ( ( got = sock.unsafe_recv( buf , 4096 ) ) > 0) { if ( result ) sb << buf; } @@ -141,10 +159,19 @@ namespace mongo { if ( h.size() == 0 ) break; + + i = h.find( ':' ); + if ( i != string::npos ) + _headers[h.substr(0,i)] = str::ltrim(h.substr(i+1)); } _body = entire; } +#ifdef MONGO_SSL + void HttpClient::_checkSSLManager() { + _sslManager.reset( new SSLManager( true ) ); + } +#endif } diff --git a/util/httpclient.h b/util/net/httpclient.h index d66544e..c3f8c82 100644 --- a/util/httpclient.h +++ b/util/net/httpclient.h @@ -17,13 +17,16 @@ #pragma once -#include "../pch.h" +#include "../../pch.h" +#include "sock.h" namespace mongo { - class HttpClient { + class HttpClient : boost::noncopyable { public: + typedef map<string,string> Headers; + class Result { public: Result() {} @@ -32,7 +35,7 @@ namespace mongo { return _entireResponse; } - const map<string,string> getHeaders() const { + const Headers getHeaders() const { return _headers; } @@ -47,7 +50,7 @@ namespace mongo { int _code; string _entireResponse; - map<string,string> _headers; + Headers _headers; string _body; friend class HttpClient; @@ -66,6 +69,11 @@ namespace mongo { private: int _go( const char * command , string url , const char * body , Result * result ); +#ifdef MONGO_SSL + void _checkSSLManager(); + + scoped_ptr<SSLManager> _sslManager; +#endif }; } diff --git a/util/net/listen.cpp b/util/net/listen.cpp new file mode 100644 index 0000000..6ee25b4 --- /dev/null +++ b/util/net/listen.cpp @@ -0,0 +1,391 @@ +// listen.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "pch.h" +#include "listen.h" +#include "message_port.h" + +#ifndef _WIN32 + +# ifndef __sunos__ +# include <ifaddrs.h> +# endif +# include <sys/resource.h> +# include <sys/stat.h> + +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <errno.h> +#include <netdb.h> +#ifdef __openbsd__ +# include <sys/uio.h> +#endif + +#else + +// errno doesn't work for winsock. +#undef errno +#define errno WSAGetLastError() + +#endif + +namespace mongo { + + + void checkTicketNumbers(); + + + // ----- Listener ------- + + const Listener* Listener::_timeTracker; + + vector<SockAddr> ipToAddrs(const char* ips, int port, bool useUnixSockets) { + vector<SockAddr> out; + if (*ips == '\0') { + out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all + + if (IPv6Enabled()) + out.push_back(SockAddr("::", port)); // IPv6 all +#ifndef _WIN32 + if (useUnixSockets) + out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket +#endif + return out; + } + + while(*ips) { + string ip; + const char * comma = strchr(ips, ','); + if (comma) { + ip = string(ips, comma - ips); + ips = comma + 1; + } + else { + ip = string(ips); + ips = ""; + } + + SockAddr sa(ip.c_str(), port); + out.push_back(sa); + +#ifndef _WIN32 + if (useUnixSockets && (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4 + out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); +#endif + } + return out; + + } + + Listener::Listener(const string& name, const string &ip, int port, bool logConnect ) + : _port(port), _name(name), _ip(ip), _logConnect(logConnect), _elapsedTime(0) { +#ifdef MONGO_SSL + _ssl = 0; + _sslPort = 0; + + if ( cmdLine.sslOnNormalPorts && cmdLine.sslServerManager ) { + secure( cmdLine.sslServerManager ); + } +#endif + } + + Listener::~Listener() { + if ( _timeTracker == this ) + _timeTracker = 0; + } + +#ifdef MONGO_SSL + void Listener::secure( SSLManager* manager ) { + _ssl = manager; + } + + void Listener::addSecurePort( SSLManager* manager , int additionalPort ) { + _ssl = manager; + _sslPort = additionalPort; + } + +#endif + + bool Listener::_setupSockets( const vector<SockAddr>& mine , vector<int>& socks ) { + for (vector<SockAddr>::const_iterator it=mine.begin(), end=mine.end(); it != end; ++it) { + const SockAddr& me = *it; + + SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0); + massert( 15863 , str::stream() << "listen(): invalid socket? " << errnoWithDescription() , sock >= 0 ); + + if (me.getType() == AF_UNIX) { +#if !defined(_WIN32) + if (unlink(me.getAddr().c_str()) == -1) { + int x = errno; + if (x != ENOENT) { + log() << "couldn't unlink socket file " << me << errnoWithDescription(x) << " skipping" << endl; + continue; + } + } +#endif + } + else if (me.getType() == AF_INET6) { + // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1) + // That causes a conflict if we don't do set it to IPV6_ONLY + const int one = 1; + setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &one, sizeof(one)); + } + +#if !defined(_WIN32) + { + const int one = 1; + if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0 ) + out() << "Failed to set socket opt, SO_REUSEADDR" << endl; + } +#endif + + if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) { + int x = errno; + error() << "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl; + if ( x == EADDRINUSE ) + error() << " addr already in use" << endl; + closesocket(sock); + return false; + } + +#if !defined(_WIN32) + if (me.getType() == AF_UNIX) { + if (chmod(me.getAddr().c_str(), 0777) == -1) { + error() << "couldn't chmod socket file " << me << errnoWithDescription() << endl; + } + ListeningSockets::get()->addPath( me.getAddr() ); + } +#endif + + if ( ::listen(sock, 128) != 0 ) { + error() << "listen(): listen() failed " << errnoWithDescription() << endl; + closesocket(sock); + return false; + } + + ListeningSockets::get()->add( sock ); + + socks.push_back(sock); + } + + return true; + } + + void Listener::initAndListen() { + checkTicketNumbers(); + vector<int> socks; + set<int> sslSocks; + + { // normal sockets + vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port, (!cmdLine.noUnixSocket && useUnixSockets())); + if ( ! _setupSockets( mine , socks ) ) + return; + } + +#ifdef MONGO_SSL + if ( _ssl && _sslPort > 0 ) { + unsigned prev = socks.size(); + + vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _sslPort, false ); + if ( ! _setupSockets( mine , socks ) ) + return; + + for ( unsigned i=prev; i<socks.size(); i++ ) { + sslSocks.insert( socks[i] ); + } + + } +#endif + + SOCKET maxfd = 0; // needed for select() + for ( unsigned i=0; i<socks.size(); i++ ) { + if ( socks[i] > maxfd ) + maxfd = socks[i]; + } + +#ifdef MONGO_SSL + if ( _ssl == 0 ) { + _logListen( _port , false ); + } + else if ( _sslPort == 0 ) { + _logListen( _port , true ); + } + else { + // both + _logListen( _port , false ); + _logListen( _sslPort , true ); + } +#else + _logListen( _port , false ); +#endif + + static long connNumber = 0; + struct timeval maxSelectTime; + while ( ! inShutdown() ) { + fd_set fds[1]; + FD_ZERO(fds); + + for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) { + FD_SET(*it, fds); + } + + maxSelectTime.tv_sec = 0; + maxSelectTime.tv_usec = 10000; + const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime); + + if (ret == 0) { +#if defined(__linux__) + _elapsedTime += ( 10000 - maxSelectTime.tv_usec ) / 1000; +#else + _elapsedTime += 10; +#endif + continue; + } + + if (ret < 0) { + int x = errno; +#ifdef EINTR + if ( x == EINTR ) { + log() << "select() signal caught, continuing" << endl; + continue; + } +#endif + if ( ! inShutdown() ) + log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl; + return; + } + +#if defined(__linux__) + _elapsedTime += max(ret, (int)(( 10000 - maxSelectTime.tv_usec ) / 1000)); +#else + _elapsedTime += ret; // assume 1ms to grab connection. very rough +#endif + + for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) { + if (! (FD_ISSET(*it, fds))) + continue; + + SockAddr from; + int s = accept(*it, from.raw(), &from.addressSize); + if ( s < 0 ) { + int x = errno; // so no global issues + if ( x == ECONNABORTED || x == EBADF ) { + log() << "Listener on port " << _port << " aborted" << endl; + return; + } + if ( x == 0 && inShutdown() ) { + return; // socket closed + } + if( !inShutdown() ) { + log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl; + if (x == EMFILE || x == ENFILE) { + // Connection still in listen queue but we can't accept it yet + error() << "Out of file descriptors. Waiting one second before trying to accept more connections." << warnings; + sleepsecs(1); + } + } + continue; + } + if (from.getType() != AF_UNIX) + disableNagle(s); + if ( _logConnect && ! cmdLine.quiet ) + log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl; + + Socket newSock = Socket(s, from); +#ifdef MONGO_SSL + if ( _ssl && ( _sslPort == 0 || sslSocks.count(*it) ) ) { + newSock.secureAccepted( _ssl ); + } +#endif + accepted( newSock ); + } + } + } + + void Listener::_logListen( int port , bool ssl ) { + log() << _name << ( _name.size() ? " " : "" ) << "waiting for connections on port " << port << ( ssl ? " ssl" : "" ) << endl; + } + + + void Listener::accepted(Socket socket) { + accepted( new MessagingPort(socket) ); + } + + void Listener::accepted(MessagingPort *mp) { + assert(!"You must overwrite one of the accepted methods"); + } + + // ----- ListeningSockets ------- + + ListeningSockets* ListeningSockets::_instance = new ListeningSockets(); + + ListeningSockets* ListeningSockets::get() { + return _instance; + } + + // ------ connection ticket and control ------ + + const int DEFAULT_MAX_CONN = 20000; + const int MAX_MAX_CONN = 20000; + + int getMaxConnections() { +#ifdef _WIN32 + return DEFAULT_MAX_CONN; +#else + struct rlimit limit; + assert( getrlimit(RLIMIT_NOFILE,&limit) == 0 ); + + int max = (int)(limit.rlim_cur * .8); + + log(1) << "fd limit" + << " hard:" << limit.rlim_max + << " soft:" << limit.rlim_cur + << " max conn: " << max + << endl; + + if ( max > MAX_MAX_CONN ) + max = MAX_MAX_CONN; + + return max; +#endif + } + + void checkTicketNumbers() { + int want = getMaxConnections(); + int current = connTicketHolder.outof(); + if ( current != DEFAULT_MAX_CONN ) { + if ( current < want ) { + // they want fewer than they can handle + // which is fine + log(1) << " only allowing " << current << " connections" << endl; + return; + } + if ( current > want ) { + log() << " --maxConns too high, can only handle " << want << endl; + } + } + connTicketHolder.resize( want ); + } + + TicketHolder connTicketHolder(DEFAULT_MAX_CONN); + +} diff --git a/util/net/listen.h b/util/net/listen.h new file mode 100644 index 0000000..415db1e --- /dev/null +++ b/util/net/listen.h @@ -0,0 +1,190 @@ +// listen.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "sock.h" + +namespace mongo { + + class MessagingPort; + + class Listener : boost::noncopyable { + public: + + Listener(const string& name, const string &ip, int port, bool logConnect=true ); + + virtual ~Listener(); + +#ifdef MONGO_SSL + /** + * make this an ssl socket + * ownership of SSLManager remains with the caller + */ + void secure( SSLManager* manager ); + + void addSecurePort( SSLManager* manager , int additionalPort ); +#endif + + void initAndListen(); // never returns unless error (start a thread) + + /* spawn a thread, etc., then return */ + virtual void accepted(Socket socket); + virtual void accepted(MessagingPort *mp); + + const int _port; + + /** + * @return a rough estimate of elapsed time since the server started + */ + long long getMyElapsedTimeMillis() const { return _elapsedTime; } + + void setAsTimeTracker() { + _timeTracker = this; + } + + static const Listener* getTimeTracker() { + return _timeTracker; + } + + static long long getElapsedTimeMillis() { + if ( _timeTracker ) + return _timeTracker->getMyElapsedTimeMillis(); + + // should this assert or throw? seems like callers may not expect to get zero back, certainly not forever. + return 0; + } + + private: + string _name; + string _ip; + bool _logConnect; + long long _elapsedTime; + +#ifdef MONGO_SSL + SSLManager* _ssl; + int _sslPort; +#endif + + /** + * @return true iff everything went ok + */ + bool _setupSockets( const vector<SockAddr>& mine , vector<int>& socks ); + + void _logListen( int port , bool ssl ); + + static const Listener* _timeTracker; + + virtual bool useUnixSockets() const { return false; } + }; + + /** + * keep track of elapsed time + * after a set amount of time, tells you to do something + * only in this file because depends on Listener + */ + class ElapsedTracker { + public: + ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks ) + : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) { + _last = Listener::getElapsedTimeMillis(); + } + + /** + * call this for every iteration + * returns true if one of the triggers has gone off + */ + bool ping() { + if ( ( ++_pings % _h ) == 0 ) { + _last = Listener::getElapsedTimeMillis(); + return true; + } + + long long now = Listener::getElapsedTimeMillis(); + if ( now - _last > _ms ) { + _last = now; + return true; + } + + return false; + } + + private: + int _h; + int _ms; + + unsigned long long _pings; + + long long _last; + + }; + + class ListeningSockets { + public: + ListeningSockets() + : _mutex("ListeningSockets") + , _sockets( new set<int>() ) + , _socketPaths( new set<string>() ) + { } + void add( int sock ) { + scoped_lock lk( _mutex ); + _sockets->insert( sock ); + } + void addPath( string path ) { + scoped_lock lk( _mutex ); + _socketPaths->insert( path ); + } + void remove( int sock ) { + scoped_lock lk( _mutex ); + _sockets->erase( sock ); + } + void closeAll() { + set<int>* sockets; + set<string>* paths; + + { + scoped_lock lk( _mutex ); + sockets = _sockets; + _sockets = new set<int>(); + paths = _socketPaths; + _socketPaths = new set<string>(); + } + + for ( set<int>::iterator i=sockets->begin(); i!=sockets->end(); i++ ) { + int sock = *i; + log() << "closing listening socket: " << sock << endl; + closesocket( sock ); + } + + for ( set<string>::iterator i=paths->begin(); i!=paths->end(); i++ ) { + string path = *i; + log() << "removing socket file: " << path << endl; + ::remove( path.c_str() ); + } + } + static ListeningSockets* get(); + private: + mongo::mutex _mutex; + set<int>* _sockets; + set<string>* _socketPaths; // for unix domain sockets + static ListeningSockets* _instance; + }; + + + extern TicketHolder connTicketHolder; + +} diff --git a/util/net/message.cpp b/util/net/message.cpp new file mode 100644 index 0000000..a84e5c4 --- /dev/null +++ b/util/net/message.cpp @@ -0,0 +1,64 @@ +// message.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" + +#include <fcntl.h> +#include <errno.h> +#include <time.h> + +#include "message.h" +#include "message_port.h" +#include "listen.h" + +#include "../goodies.h" +#include "../../client/dbclient.h" + +namespace mongo { + + void Message::send( MessagingPort &p, const char *context ) { + if ( empty() ) { + return; + } + if ( _buf != 0 ) { + p.send( (char*)_buf, _buf->len, context ); + } + else { + p.send( _data, context ); + } + } + + MSGID NextMsgId; + + /*struct MsgStart { + MsgStart() { + NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); + assert(MsgDataHeaderSize == 16); + } + } msgstart;*/ + + MSGID nextMessageId() { + MSGID msgid = NextMsgId++; + return msgid; + } + + bool doesOpGetAResponse( int op ) { + return op == dbQuery || op == dbGetMore; + } + + +} // namespace mongo diff --git a/util/message.h b/util/net/message.h index f114445..16da5d6 100644 --- a/util/message.h +++ b/util/net/message.h @@ -1,4 +1,4 @@ -// Message.h +// message.h /* Copyright 2009 10gen Inc. * @@ -17,154 +17,17 @@ #pragma once -#include "../util/sock.h" -#include "../bson/util/atomic_int.h" +#include "sock.h" +#include "../../bson/util/atomic_int.h" #include "hostandport.h" namespace mongo { - extern bool noUnixSocket; - class Message; class MessagingPort; class PiggyBackData; - typedef AtomicUInt MSGID; - - class Listener : boost::noncopyable { - public: - Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0) { } - virtual ~Listener() { - if ( _timeTracker == this ) - _timeTracker = 0; - } - void initAndListen(); // never returns unless error (start a thread) - - /* spawn a thread, etc., then return */ - virtual void accepted(int sock, const SockAddr& from); - virtual void accepted(MessagingPort *mp) { - assert(!"You must overwrite one of the accepted methods"); - } - - const int _port; - - /** - * @return a rough estimate of elepased time since the server started - */ - long long getMyElapsedTimeMillis() const { return _elapsedTime; } - - void setAsTimeTracker() { - _timeTracker = this; - } - - static const Listener* getTimeTracker() { - return _timeTracker; - } - - static long long getElapsedTimeMillis() { - if ( _timeTracker ) - return _timeTracker->getMyElapsedTimeMillis(); - - // should this assert or throw? seems like callers may not expect to get zero back, certainly not forever. - return 0; - } - - private: - string _ip; - bool _logConnect; - long long _elapsedTime; - - static const Listener* _timeTracker; - }; - - class AbstractMessagingPort : boost::noncopyable { - public: - virtual ~AbstractMessagingPort() { } - virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available - virtual void reply(Message& received, Message& response) = 0; - - virtual HostAndPort remote() const = 0; - virtual unsigned remotePort() const = 0; - - private: - int _clientId; - }; - - class MessagingPort : public AbstractMessagingPort { - public: - MessagingPort(int sock, const SockAddr& farEnd); - - // in some cases the timeout will actually be 2x this value - eg we do a partial send, - // then the timeout fires, then we try to send again, then the timeout fires again with - // no data sent, then we detect that the other side is down - MessagingPort(double so_timeout = 0, int logLevel = 0 ); - - virtual ~MessagingPort(); - - void shutdown(); - - bool connect(SockAddr& farEnd); - - /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. - also, the Message data will go out of scope on the subsequent recv call. - */ - bool recv(Message& m); - void reply(Message& received, Message& response, MSGID responseTo); - void reply(Message& received, Message& response); - bool call(Message& toSend, Message& response); - - void say(Message& toSend, int responseTo = -1); - - /** - * this is used for doing 'async' queries - * instead of doing call( to , from ) - * you would do - * say( to ) - * recv( from ) - * Note: if you fail to call recv and someone else uses this port, - * horrible things will happend - */ - bool recv( const Message& sent , Message& response ); - - void piggyBack( Message& toSend , int responseTo = -1 ); - - virtual unsigned remotePort() const; - virtual HostAndPort remote() const; - - // send len or throw SocketException - void send( const char * data , int len, const char *context ); - void send( const vector< pair< char *, int > > &data, const char *context ); - - // recv len or throw SocketException - void recv( char * data , int len ); - - int unsafe_recv( char *buf, int max ); - void clearCounters() { _bytesIn = 0; _bytesOut = 0; } - long long getBytesIn() const { return _bytesIn; } - long long getBytesOut() const { return _bytesOut; } - private: - int sock; - PiggyBackData * piggyBackData; - - long long _bytesIn; - long long _bytesOut; - - // this is the parsed version of farEnd - // mutable because its initialized only on call to remote() - mutable HostAndPort _farEndParsed; - - public: - SockAddr farEnd; - double _timeout; - int _logLevel; // passed to log() when logging errors - - static void closeAllSockets(unsigned tagMask = 0xffffffff); - - /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */ - unsigned tag; - - friend class PiggyBackData; - }; + typedef AtomicUInt MSGID; enum Operations { opReply = 1, /* reply. responseTo is set. */ @@ -425,17 +288,9 @@ namespace mongo { return _freeIt; } - void send( MessagingPort &p, const char *context ) { - if ( empty() ) { - return; - } - if ( _buf != 0 ) { - p.send( (char*)_buf, _buf->len, context ); - } - else { - p.send( _data, context ); - } - } + void send( MessagingPort &p, const char *context ); + + string toString() const; private: void _setData( MsgData *d, bool freeIt ) { @@ -450,59 +305,8 @@ namespace mongo { bool _freeIt; }; - class SocketException : public DBException { - public: - const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type; - - SocketException( Type t , string server="" , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ } - virtual ~SocketException() throw() {} - - bool shouldPrint() const { return _type != CLOSED; } - virtual string toString() const; - - private: - string _server; - string _extra; - }; MSGID nextMessageId(); - extern TicketHolder connTicketHolder; - - class ElapsedTracker { - public: - ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks ) - : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) { - _last = Listener::getElapsedTimeMillis(); - } - - /** - * call this for every iteration - * returns true if one of the triggers has gone off - */ - bool ping() { - if ( ( ++_pings % _h ) == 0 ) { - _last = Listener::getElapsedTimeMillis(); - return true; - } - - long long now = Listener::getElapsedTimeMillis(); - if ( now - _last > _ms ) { - _last = now; - return true; - } - - return false; - } - - private: - int _h; - int _ms; - - unsigned long long _pings; - - long long _last; - - }; } // namespace mongo diff --git a/util/net/message_port.cpp b/util/net/message_port.cpp new file mode 100644 index 0000000..9abfaf7 --- /dev/null +++ b/util/net/message_port.cpp @@ -0,0 +1,298 @@ +// message_port.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" + +#include <fcntl.h> +#include <errno.h> +#include <time.h> + +#include "message.h" +#include "message_port.h" +#include "listen.h" + +#include "../goodies.h" +#include "../background.h" +#include "../time_support.h" +#include "../../db/cmdline.h" +#include "../../client/dbclient.h" + + +#ifndef _WIN32 +# ifndef __sunos__ +# include <ifaddrs.h> +# endif +# include <sys/resource.h> +# include <sys/stat.h> +#else + +// errno doesn't work for winsock. +#undef errno +#define errno WSAGetLastError() + +#endif + +namespace mongo { + + +// if you want trace output: +#define mmm(x) + + /* messagingport -------------------------------------------------------------- */ + + class PiggyBackData { + public: + PiggyBackData( MessagingPort * port ) { + _port = port; + _buf = new char[1300]; + _cur = _buf; + } + + ~PiggyBackData() { + DESTRUCTOR_GUARD ( + flush(); + delete[]( _cur ); + ); + } + + void append( Message& m ) { + assert( m.header()->len <= 1300 ); + + if ( len() + m.header()->len > 1300 ) + flush(); + + memcpy( _cur , m.singleData() , m.header()->len ); + _cur += m.header()->len; + } + + void flush() { + if ( _buf == _cur ) + return; + + _port->send( _buf , len(), "flush" ); + _cur = _buf; + } + + int len() const { return _cur - _buf; } + + private: + MessagingPort* _port; + char * _buf; + char * _cur; + }; + + class Ports { + set<MessagingPort*> ports; + mongo::mutex m; + public: + Ports() : ports(), m("Ports") {} + void closeAll(unsigned skip_mask) { + scoped_lock bl(m); + for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) { + if( (*i)->tag & skip_mask ) + continue; + (*i)->shutdown(); + } + } + void insert(MessagingPort* p) { + scoped_lock bl(m); + ports.insert(p); + } + void erase(MessagingPort* p) { + scoped_lock bl(m); + ports.erase(p); + } + }; + + // we "new" this so it is still be around when other automatic global vars + // are being destructed during termination. + Ports& ports = *(new Ports()); + + void MessagingPort::closeAllSockets(unsigned mask) { + ports.closeAll(mask); + } + + MessagingPort::MessagingPort(int fd, const SockAddr& remote) + : Socket( fd , remote ) , piggyBackData(0) { + ports.insert(this); + } + + MessagingPort::MessagingPort( double timeout, int ll ) + : Socket( timeout, ll ) { + ports.insert(this); + piggyBackData = 0; + } + + MessagingPort::MessagingPort( Socket& sock ) + : Socket( sock ) , piggyBackData( 0 ) { + } + + void MessagingPort::shutdown() { + close(); + } + + MessagingPort::~MessagingPort() { + if ( piggyBackData ) + delete( piggyBackData ); + shutdown(); + ports.erase(this); + } + + bool MessagingPort::recv(Message& m) { + try { +again: + mmm( log() << "* recv() sock:" << this->sock << endl; ) + int len = -1; + + char *lenbuf = (char *) &len; + int lft = 4; + Socket::recv( lenbuf, lft ); + + if ( len < 16 || len > 48000000 ) { // messages must be large enough for headers + if ( len == -1 ) { + // Endian check from the client, after connecting, to see what mode server is running in. + unsigned foo = 0x10203040; + send( (char *) &foo, 4, "endian" ); + goto again; + } + + if ( len == 542393671 ) { + // an http GET + log(_logLevel) << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl; + string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n"; + stringstream ss; + ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; + string s = ss.str(); + send( s.c_str(), s.size(), "http" ); + return false; + } + log(0) << "recv(): message len " << len << " is too large" << len << endl; + return false; + } + + int z = (len+1023)&0xfffffc00; + assert(z>=len); + MsgData *md = (MsgData *) malloc(z); + assert(md); + md->len = len; + + char *p = (char *) &md->id; + int left = len -4; + + try { + Socket::recv( p, left ); + } + catch (...) { + free(md); + throw; + } + + m.setData(md, true); + return true; + + } + catch ( const SocketException & e ) { + log(_logLevel + (e.shouldPrint() ? 0 : 1) ) << "SocketException: remote: " << remote() << " error: " << e << endl; + m.reset(); + return false; + } + } + + void MessagingPort::reply(Message& received, Message& response) { + say(/*received.from, */response, received.header()->id); + } + + void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) { + say(/*received.from, */response, responseTo); + } + + bool MessagingPort::call(Message& toSend, Message& response) { + mmm( log() << "*call()" << endl; ) + say(toSend); + return recv( toSend , response ); + } + + bool MessagingPort::recv( const Message& toSend , Message& response ) { + while ( 1 ) { + bool ok = recv(response); + if ( !ok ) + return false; + //log() << "got response: " << response.data->responseTo << endl; + if ( response.header()->responseTo == toSend.header()->id ) + break; + error() << "MessagingPort::call() wrong id got:" << hex << (unsigned)response.header()->responseTo << " expect:" << (unsigned)toSend.header()->id << '\n' + << dec + << " toSend op: " << (unsigned)toSend.operation() << '\n' + << " response msgid:" << (unsigned)response.header()->id << '\n' + << " response len: " << (unsigned)response.header()->len << '\n' + << " response op: " << response.operation() << '\n' + << " remote: " << remoteString() << endl; + assert(false); + response.reset(); + } + mmm( log() << "*call() end" << endl; ) + return true; + } + + void MessagingPort::say(Message& toSend, int responseTo) { + assert( !toSend.empty() ); + mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) + toSend.header()->id = nextMessageId(); + toSend.header()->responseTo = responseTo; + + if ( piggyBackData && piggyBackData->len() ) { + mmm( log() << "* have piggy back" << endl; ) + if ( ( piggyBackData->len() + toSend.header()->len ) > 1300 ) { + // won't fit in a packet - so just send it off + piggyBackData->flush(); + } + else { + piggyBackData->append( toSend ); + piggyBackData->flush(); + return; + } + } + + toSend.send( *this, "say" ); + } + + void MessagingPort::piggyBack( Message& toSend , int responseTo ) { + + if ( toSend.header()->len > 1300 ) { + // not worth saving because its almost an entire packet + say( toSend ); + return; + } + + // we're going to be storing this, so need to set it up + toSend.header()->id = nextMessageId(); + toSend.header()->responseTo = responseTo; + + if ( ! piggyBackData ) + piggyBackData = new PiggyBackData( this ); + + piggyBackData->append( toSend ); + } + + HostAndPort MessagingPort::remote() const { + if ( ! _remoteParsed.hasPort() ) + _remoteParsed = HostAndPort( remoteAddr() ); + return _remoteParsed; + } + + +} // namespace mongo diff --git a/util/net/message_port.h b/util/net/message_port.h new file mode 100644 index 0000000..22ecafe --- /dev/null +++ b/util/net/message_port.h @@ -0,0 +1,107 @@ +// message_port.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "sock.h" +#include "message.h" + +namespace mongo { + + class MessagingPort; + class PiggyBackData; + + typedef AtomicUInt MSGID; + + class AbstractMessagingPort : boost::noncopyable { + public: + AbstractMessagingPort() : tag(0) {} + virtual ~AbstractMessagingPort() { } + virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available + virtual void reply(Message& received, Message& response) = 0; + + virtual HostAndPort remote() const = 0; + virtual unsigned remotePort() const = 0; + + private: + + public: + // TODO make this private with some helpers + + /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */ + unsigned tag; + + }; + + class MessagingPort : public AbstractMessagingPort , public Socket { + public: + MessagingPort(int fd, const SockAddr& remote); + + // in some cases the timeout will actually be 2x this value - eg we do a partial send, + // then the timeout fires, then we try to send again, then the timeout fires again with + // no data sent, then we detect that the other side is down + MessagingPort(double so_timeout = 0, int logLevel = 0 ); + + MessagingPort(Socket& socket); + + virtual ~MessagingPort(); + + void shutdown(); + + /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. + also, the Message data will go out of scope on the subsequent recv call. + */ + bool recv(Message& m); + void reply(Message& received, Message& response, MSGID responseTo); + void reply(Message& received, Message& response); + bool call(Message& toSend, Message& response); + + void say(Message& toSend, int responseTo = -1); + + /** + * this is used for doing 'async' queries + * instead of doing call( to , from ) + * you would do + * say( to ) + * recv( from ) + * Note: if you fail to call recv and someone else uses this port, + * horrible things will happend + */ + bool recv( const Message& sent , Message& response ); + + void piggyBack( Message& toSend , int responseTo = -1 ); + + unsigned remotePort() const { return Socket::remotePort(); } + virtual HostAndPort remote() const; + + + private: + + PiggyBackData * piggyBackData; + + // this is the parsed version of remote + // mutable because its initialized only on call to remote() + mutable HostAndPort _remoteParsed; + + public: + static void closeAllSockets(unsigned tagMask = 0xffffffff); + + friend class PiggyBackData; + }; + + +} // namespace mongo diff --git a/util/message_server.h b/util/net/message_server.h index defae0b..ae77b97 100644 --- a/util/message_server.h +++ b/util/net/message_server.h @@ -22,16 +22,28 @@ #pragma once -#include "../pch.h" +#include "../../pch.h" namespace mongo { class MessageHandler { public: virtual ~MessageHandler() {} - + + /** + * called once when a socket is connected + */ virtual void connected( AbstractMessagingPort* p ) = 0; + + /** + * called every time a message comes in + * handler is responsible for responding to client + */ virtual void process( Message& m , AbstractMessagingPort* p , LastError * err ) = 0; + + /** + * called once when a socket is disconnected + */ virtual void disconnected( AbstractMessagingPort* p ) = 0; }; diff --git a/util/message_server_asio.cpp b/util/net/message_server_asio.cpp index 0c6a7d9..0c6a7d9 100644 --- a/util/message_server_asio.cpp +++ b/util/net/message_server_asio.cpp diff --git a/util/message_server_port.cpp b/util/net/message_server_port.cpp index 409b0c7..ca0b13d 100644 --- a/util/message_server_port.cpp +++ b/util/net/message_server_port.cpp @@ -20,13 +20,15 @@ #ifndef USE_ASIO #include "message.h" +#include "message_port.h" #include "message_server.h" +#include "listen.h" -#include "../db/cmdline.h" -#include "../db/lasterror.h" -#include "../db/stats/counters.h" +#include "../../db/cmdline.h" +#include "../../db/lasterror.h" +#include "../../db/stats/counters.h" -#ifdef __linux__ +#ifdef __linux__ // TODO: consider making this ifndef _WIN32 # include <sys/resource.h> #endif @@ -38,13 +40,15 @@ namespace mongo { void threadRun( MessagingPort * inPort) { TicketHolderReleaser connTicketReleaser( &connTicketHolder ); - - assert( inPort ); setThreadName( "conn" ); - + + assert( inPort ); + inPort->setLogLevel(1); scoped_ptr<MessagingPort> p( inPort ); + p->postFork(); + string otherSide; Message m; @@ -52,11 +56,11 @@ namespace mongo { LastError * le = new LastError(); lastError.reset( le ); // lastError now has ownership - otherSide = p->farEnd.toString(); + otherSide = p->remoteString(); handler->connected( p.get() ); - while ( 1 ) { + while ( ! inShutdown() ) { m.reset(); p->clearCounters(); @@ -71,14 +75,25 @@ namespace mongo { networkCounter.hit( p->getBytesIn() , p->getBytesOut() ); } } - catch ( const SocketException& ) { - log() << "unclean socket shutdown from: " << otherSide << endl; + catch ( AssertionException& e ) { + log() << "AssertionException handling request, closing client connection: " << e << endl; + p->shutdown(); + } + catch ( SocketException& e ) { + log() << "SocketException handling request, closing client connection: " << e << endl; + p->shutdown(); } - catch ( const std::exception& e ) { - problem() << "uncaught exception (" << e.what() << ")(" << demangleName( typeid(e) ) <<") in PortMessageServer::threadRun, closing connection" << endl; + catch ( const ClockSkewException & ) { + log() << "ClockSkewException - shutting down" << endl; + exitCleanly( EXIT_CLOCK_SKEW ); + } + catch ( std::exception &e ) { + error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl; + dbexit( EXIT_UNCAUGHT ); } catch ( ... ) { - problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl; + error() << "Uncaught exception, terminating" << endl; + dbexit( EXIT_UNCAUGHT ); } handler->disconnected( p.get() ); @@ -89,7 +104,7 @@ namespace mongo { class PortMessageServer : public MessageServer , public Listener { public: PortMessageServer( const MessageServer::Options& opts, MessageHandler * handler ) : - Listener( opts.ipList, opts.port ) { + Listener( "" , opts.ipList, opts.port ) { uassert( 10275 , "multiple PortMessageServer not supported" , ! pms::handler ); pms::handler = handler; @@ -116,19 +131,19 @@ namespace mongo { pthread_attr_init(&attrs); pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); - static const size_t STACK_SIZE = 4*1024*1024; + static const size_t STACK_SIZE = 1024*1024; // if we change this we need to update the warning struct rlimit limits; - assert(getrlimit(RLIMIT_STACK, &limits) == 0); + verify(15887, getrlimit(RLIMIT_STACK, &limits) == 0); if (limits.rlim_cur > STACK_SIZE) { pthread_attr_setstacksize(&attrs, (DEBUG_BUILD ? (STACK_SIZE / 2) : STACK_SIZE)); - } - else if (limits.rlim_cur < 1024*1024) { - warning() << "Stack size set to " << (limits.rlim_cur/1024) << "KB. We suggest at least 1MB" << endl; + } else if (limits.rlim_cur < 1024*1024) { + warning() << "Stack size set to " << (limits.rlim_cur/1024) << "KB. We suggest 1MB" << endl; } + pthread_t thread; int failed = pthread_create(&thread, &attrs, (void*(*)(void*)) &pms::threadRun, p); @@ -149,6 +164,16 @@ namespace mongo { sleepmillis(2); } + catch ( ... ) { + connTicketHolder.release(); + log() << "unknown error accepting new socket" << endl; + + p->shutdown(); + delete p; + + sleepmillis(2); + } + } virtual void setAsTimeTracker() { @@ -159,6 +184,7 @@ namespace mongo { initAndListen(); } + virtual bool useUnixSockets() const { return true; } }; diff --git a/util/miniwebserver.cpp b/util/net/miniwebserver.cpp index e700112..0793100 100644 --- a/util/miniwebserver.cpp +++ b/util/net/miniwebserver.cpp @@ -17,14 +17,14 @@ #include "pch.h" #include "miniwebserver.h" -#include "hex.h" +#include "../hex.h" -#include <pcrecpp.h> +#include "pcrecpp.h" namespace mongo { - MiniWebServer::MiniWebServer(const string &ip, int port) - : Listener(ip, port, false) + MiniWebServer::MiniWebServer(const string& name, const string &ip, int port) + : Listener(name, ip, port, false) {} string MiniWebServer::parseURL( const char * buf ) { @@ -108,17 +108,18 @@ namespace mongo { return false; } - void MiniWebServer::accepted(int s, const SockAddr &from) { - setSockTimeouts(s, 8); + void MiniWebServer::accepted(Socket sock) { + sock.postFork(); + sock.setTimeout(8); char buf[4096]; int len = 0; while ( 1 ) { int left = sizeof(buf) - 1 - len; if( left == 0 ) break; - int x = ::recv(s, buf + len, left, 0); + int x = sock.unsafe_recv( buf + len , left ); if ( x <= 0 ) { - closesocket(s); + sock.close(); return; } len += x; @@ -134,7 +135,7 @@ namespace mongo { vector<string> headers; try { - doRequest(buf, parseURL( buf ), responseMsg, responseCode, headers, from); + doRequest(buf, parseURL( buf ), responseMsg, responseCode, headers, sock.remoteAddr() ); } catch ( std::exception& e ) { responseCode = 500; @@ -165,8 +166,8 @@ namespace mongo { ss << responseMsg; string response = ss.str(); - ::send(s, response.c_str(), response.size(), 0); - closesocket(s); + sock.send( response.c_str(), response.size() , "http response" ); + sock.close(); } string MiniWebServer::getHeader( const char * req , string wanted ) { diff --git a/util/miniwebserver.h b/util/net/miniwebserver.h index b385afc..1fb6b3f 100644 --- a/util/miniwebserver.h +++ b/util/net/miniwebserver.h @@ -17,15 +17,17 @@ #pragma once -#include "../pch.h" +#include "../../pch.h" #include "message.h" -#include "../db/jsobj.h" +#include "message_port.h" +#include "listen.h" +#include "../../db/jsobj.h" namespace mongo { class MiniWebServer : public Listener { public: - MiniWebServer(const string &ip, int _port); + MiniWebServer(const string& name, const string &ip, int _port); virtual ~MiniWebServer() {} virtual void doRequest( @@ -51,7 +53,7 @@ namespace mongo { static string urlDecode(string s) {return urlDecode(s.c_str());} private: - void accepted(int s, const SockAddr &from); + void accepted(Socket socket); static bool fullReceive( const char *buf ); }; diff --git a/util/net/sock.cpp b/util/net/sock.cpp new file mode 100644 index 0000000..69c42f2 --- /dev/null +++ b/util/net/sock.cpp @@ -0,0 +1,713 @@ +// @file sock.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "sock.h" +#include "../background.h" + +#if !defined(_WIN32) +# include <sys/socket.h> +# include <sys/types.h> +# include <sys/socket.h> +# include <sys/un.h> +# include <netinet/in.h> +# include <netinet/tcp.h> +# include <arpa/inet.h> +# include <errno.h> +# include <netdb.h> +# if defined(__openbsd__) +# include <sys/uio.h> +# endif +#endif + +#ifdef MONGO_SSL +#include <openssl/err.h> +#include <openssl/ssl.h> +#endif + + +namespace mongo { + + static bool ipv6 = false; + void enableIPv6(bool state) { ipv6 = state; } + bool IPv6Enabled() { return ipv6; } + + void setSockTimeouts(int sock, double secs) { + struct timeval tv; + tv.tv_sec = (int)secs; + tv.tv_usec = (int)((long long)(secs*1000*1000) % (1000*1000)); + bool report = logLevel > 3; // solaris doesn't provide these + DEV report = true; + bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv) ) == 0; + if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; + ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof(tv) ) == 0; + DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; + } + +#if defined(_WIN32) + void disableNagle(int sock) { + int x = 1; + if ( setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &x, sizeof(x)) ) + error() << "disableNagle failed" << endl; + if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) ) + error() << "SO_KEEPALIVE failed" << endl; + } +#else + + void disableNagle(int sock) { + int x = 1; + +#ifdef SOL_TCP + int level = SOL_TCP; +#else + int level = SOL_SOCKET; +#endif + + if ( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) ) + error() << "disableNagle failed: " << errnoWithDescription() << endl; + +#ifdef SO_KEEPALIVE + if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) ) + error() << "SO_KEEPALIVE failed: " << errnoWithDescription() << endl; + +# ifdef __linux__ + socklen_t len = sizeof(x); + if ( getsockopt(sock, level, TCP_KEEPIDLE, (char *) &x, &len) ) + error() << "can't get TCP_KEEPIDLE: " << errnoWithDescription() << endl; + + if (x > 300) { + x = 300; + if ( setsockopt(sock, level, TCP_KEEPIDLE, (char *) &x, sizeof(x)) ) { + error() << "can't set TCP_KEEPIDLE: " << errnoWithDescription() << endl; + } + } + + len = sizeof(x); // just in case it changed + if ( getsockopt(sock, level, TCP_KEEPINTVL, (char *) &x, &len) ) + error() << "can't get TCP_KEEPINTVL: " << errnoWithDescription() << endl; + + if (x > 300) { + x = 300; + if ( setsockopt(sock, level, TCP_KEEPINTVL, (char *) &x, sizeof(x)) ) { + error() << "can't set TCP_KEEPINTVL: " << errnoWithDescription() << endl; + } + } +# endif +#endif + + } + +#endif + + string getAddrInfoStrError(int code) { +#if !defined(_WIN32) + return gai_strerror(code); +#else + /* gai_strerrorA is not threadsafe on windows. don't use it. */ + return errnoWithDescription(code); +#endif + } + + + // --- SockAddr + + SockAddr::SockAddr(int sourcePort) { + memset(as<sockaddr_in>().sin_zero, 0, sizeof(as<sockaddr_in>().sin_zero)); + as<sockaddr_in>().sin_family = AF_INET; + as<sockaddr_in>().sin_port = htons(sourcePort); + as<sockaddr_in>().sin_addr.s_addr = htonl(INADDR_ANY); + addressSize = sizeof(sockaddr_in); + } + + SockAddr::SockAddr(const char * iporhost , int port) { + if (!strcmp(iporhost, "localhost")) + iporhost = "127.0.0.1"; + + if (strchr(iporhost, '/')) { +#ifdef _WIN32 + uassert(13080, "no unix socket support on windows", false); +#endif + uassert(13079, "path to unix socket too long", strlen(iporhost) < sizeof(as<sockaddr_un>().sun_path)); + as<sockaddr_un>().sun_family = AF_UNIX; + strcpy(as<sockaddr_un>().sun_path, iporhost); + addressSize = sizeof(sockaddr_un); + } + else { + addrinfo* addrs = NULL; + addrinfo hints; + memset(&hints, 0, sizeof(addrinfo)); + hints.ai_socktype = SOCK_STREAM; + //hints.ai_flags = AI_ADDRCONFIG; // This is often recommended but don't do it. SERVER-1579 + hints.ai_flags |= AI_NUMERICHOST; // first pass tries w/o DNS lookup + hints.ai_family = (IPv6Enabled() ? AF_UNSPEC : AF_INET); + + StringBuilder ss; + ss << port; + int ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs); + + // old C compilers on IPv6-capable hosts return EAI_NODATA error +#ifdef EAI_NODATA + int nodata = (ret == EAI_NODATA); +#else + int nodata = false; +#endif + if (ret == EAI_NONAME || nodata) { + // iporhost isn't an IP address, allow DNS lookup + hints.ai_flags &= ~AI_NUMERICHOST; + ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs); + } + + if (ret) { + // don't log if this as it is a CRT construction and log() may not work yet. + if( strcmp("0.0.0.0", iporhost) ) { + log() << "getaddrinfo(\"" << iporhost << "\") failed: " << gai_strerror(ret) << endl; + } + *this = SockAddr(port); + } + else { + //TODO: handle other addresses in linked list; + assert(addrs->ai_addrlen <= sizeof(sa)); + memcpy(&sa, addrs->ai_addr, addrs->ai_addrlen); + addressSize = addrs->ai_addrlen; + freeaddrinfo(addrs); + } + } + } + + bool SockAddr::isLocalHost() const { + switch (getType()) { + case AF_INET: return getAddr() == "127.0.0.1"; + case AF_INET6: return getAddr() == "::1"; + case AF_UNIX: return true; + default: return false; + } + assert(false); + return false; + } + + string SockAddr::toString(bool includePort) const { + string out = getAddr(); + if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC) + out += mongoutils::str::stream() << ':' << getPort(); + return out; + } + + sa_family_t SockAddr::getType() const { + return sa.ss_family; + } + + unsigned SockAddr::getPort() const { + switch (getType()) { + case AF_INET: return ntohs(as<sockaddr_in>().sin_port); + case AF_INET6: return ntohs(as<sockaddr_in6>().sin6_port); + case AF_UNIX: return 0; + case AF_UNSPEC: return 0; + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return 0; + } + } + + string SockAddr::getAddr() const { + switch (getType()) { + case AF_INET: + case AF_INET6: { + const int buflen=128; + char buffer[buflen]; + int ret = getnameinfo(raw(), addressSize, buffer, buflen, NULL, 0, NI_NUMERICHOST); + massert(13082, getAddrInfoStrError(ret), ret == 0); + return buffer; + } + + case AF_UNIX: return (addressSize > 2 ? as<sockaddr_un>().sun_path : "anonymous unix socket"); + case AF_UNSPEC: return "(NONE)"; + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return ""; + } + } + + bool SockAddr::operator==(const SockAddr& r) const { + if (getType() != r.getType()) + return false; + + if (getPort() != r.getPort()) + return false; + + switch (getType()) { + case AF_INET: return as<sockaddr_in>().sin_addr.s_addr == r.as<sockaddr_in>().sin_addr.s_addr; + case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) == 0; + case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) == 0; + case AF_UNSPEC: return true; // assume all unspecified addresses are the same + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); + } + return false; + } + + bool SockAddr::operator!=(const SockAddr& r) const { + return !(*this == r); + } + + bool SockAddr::operator<(const SockAddr& r) const { + if (getType() < r.getType()) + return true; + else if (getType() > r.getType()) + return false; + + if (getPort() < r.getPort()) + return true; + else if (getPort() > r.getPort()) + return false; + + switch (getType()) { + case AF_INET: return as<sockaddr_in>().sin_addr.s_addr < r.as<sockaddr_in>().sin_addr.s_addr; + case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) < 0; + case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) < 0; + case AF_UNSPEC: return false; + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); + } + return false; + } + + SockAddr unknownAddress( "0.0.0.0", 0 ); + + // ------ hostname ------------------- + + string hostbyname(const char *hostname) { + string addr = SockAddr(hostname, 0).getAddr(); + if (addr == "0.0.0.0") + return ""; + else + return addr; + } + + // --- my -- + + string getHostName() { + char buf[256]; + int ec = gethostname(buf, 127); + if ( ec || *buf == 0 ) { + log() << "can't get this server's hostname " << errnoWithDescription() << endl; + return ""; + } + return buf; + } + + + string _hostNameCached; + static void _hostNameCachedInit() { + _hostNameCached = getHostName(); + } + boost::once_flag _hostNameCachedInitFlags = BOOST_ONCE_INIT; + + string getHostNameCached() { + boost::call_once( _hostNameCachedInit , _hostNameCachedInitFlags ); + return _hostNameCached; + } + + // --------- SocketException ---------- + +#ifdef MSG_NOSIGNAL + const int portSendFlags = MSG_NOSIGNAL; + const int portRecvFlags = MSG_NOSIGNAL; +#else + const int portSendFlags = 0; + const int portRecvFlags = 0; +#endif + + string SocketException::toString() const { + stringstream ss; + ss << _ei.code << " socket exception [" << _type << "] "; + + if ( _server.size() ) + ss << "server [" << _server << "] "; + + if ( _extra.size() ) + ss << _extra; + + return ss.str(); + } + + + // ------------ SSLManager ----------------- + +#ifdef MONGO_SSL + SSLManager::SSLManager( bool client ) { + _client = client; + SSL_library_init(); + SSL_load_error_strings(); + ERR_load_crypto_strings(); + + _context = SSL_CTX_new( client ? SSLv23_client_method() : SSLv23_server_method() ); + massert( 15864 , mongoutils::str::stream() << "can't create SSL Context: " << ERR_error_string(ERR_get_error(), NULL) , _context ); + + SSL_CTX_set_options( _context, SSL_OP_ALL); + } + + void SSLManager::setupPubPriv( const string& privateKeyFile , const string& publicKeyFile ) { + massert( 15865 , + mongoutils::str::stream() << "Can't read SSL certificate from file " + << publicKeyFile << ":" << ERR_error_string(ERR_get_error(), NULL) , + SSL_CTX_use_certificate_file(_context, publicKeyFile.c_str(), SSL_FILETYPE_PEM) ); + + + massert( 15866 , + mongoutils::str::stream() << "Can't read SSL private key from file " + << privateKeyFile << " : " << ERR_error_string(ERR_get_error(), NULL) , + SSL_CTX_use_PrivateKey_file(_context, privateKeyFile.c_str(), SSL_FILETYPE_PEM) ); + } + + + int SSLManager::password_cb(char *buf,int num, int rwflag,void *userdata){ + SSLManager* sm = (SSLManager*)userdata; + string pass = sm->_password; + strcpy(buf,pass.c_str()); + return(pass.size()); + } + + void SSLManager::setupPEM( const string& keyFile , const string& password ) { + _password = password; + + massert( 15867 , "Can't read certificate file" , SSL_CTX_use_certificate_chain_file( _context , keyFile.c_str() ) ); + + SSL_CTX_set_default_passwd_cb_userdata( _context , this ); + SSL_CTX_set_default_passwd_cb( _context, &SSLManager::password_cb ); + + massert( 15868 , "Can't read key file" , SSL_CTX_use_PrivateKey_file( _context , keyFile.c_str() , SSL_FILETYPE_PEM ) ); + } + + SSL * SSLManager::secure( int fd ) { + SSL * ssl = SSL_new( _context ); + massert( 15861 , "can't create SSL" , ssl ); + SSL_set_fd( ssl , fd ); + return ssl; + } + + +#endif + + // ------------ Socket ----------------- + + Socket::Socket(int fd , const SockAddr& remote) : + _fd(fd), _remote(remote), _timeout(0) { + _logLevel = 0; + _init(); + } + + Socket::Socket( double timeout, int ll ) { + _logLevel = ll; + _fd = -1; + _timeout = timeout; + _init(); + } + + void Socket::_init() { + _bytesOut = 0; + _bytesIn = 0; +#ifdef MONGO_SSL + _sslAccepted = 0; +#endif + } + + void Socket::close() { +#ifdef MONGO_SSL + _ssl.reset(); +#endif + if ( _fd >= 0 ) { + closesocket( _fd ); + _fd = -1; + } + } + +#ifdef MONGO_SSL + void Socket::secure( SSLManager * ssl ) { + assert( ssl ); + assert( _fd >= 0 ); + _ssl.reset( ssl->secure( _fd ) ); + SSL_connect( _ssl.get() ); + } + + void Socket::secureAccepted( SSLManager * ssl ) { + _sslAccepted = ssl; + } +#endif + + void Socket::postFork() { +#ifdef MONGO_SSL + if ( _sslAccepted ) { + assert( _fd ); + _ssl.reset( _sslAccepted->secure( _fd ) ); + SSL_accept( _ssl.get() ); + _sslAccepted = 0; + } +#endif + } + + class ConnectBG : public BackgroundJob { + public: + ConnectBG(int sock, SockAddr remote) : _sock(sock), _remote(remote) { } + + void run() { _res = ::connect(_sock, _remote.raw(), _remote.addressSize); } + string name() const { return "ConnectBG"; } + int inError() const { return _res; } + + private: + int _sock; + int _res; + SockAddr _remote; + }; + + bool Socket::connect(SockAddr& remote) { + _remote = remote; + + _fd = socket(remote.getType(), SOCK_STREAM, 0); + if ( _fd == INVALID_SOCKET ) { + log(_logLevel) << "ERROR: connect invalid socket " << errnoWithDescription() << endl; + return false; + } + + if ( _timeout > 0 ) { + setTimeout( _timeout ); + } + + ConnectBG bg(_fd, remote); + bg.go(); + if ( bg.wait(5000) ) { + if ( bg.inError() ) { + close(); + return false; + } + } + else { + // time out the connect + close(); + bg.wait(); // so bg stays in scope until bg thread terminates + return false; + } + + if (remote.getType() != AF_UNIX) + disableNagle(_fd); + +#ifdef SO_NOSIGPIPE + // osx + const int one = 1; + setsockopt( _fd , SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int)); +#endif + + return true; + } + + int Socket::_send( const char * data , int len ) { +#ifdef MONGO_SSL + if ( _ssl ) { + return SSL_write( _ssl.get() , data , len ); + } +#endif + return ::send( _fd , data , len , portSendFlags ); + } + + // sends all data or throws an exception + void Socket::send( const char * data , int len, const char *context ) { + while( len > 0 ) { + int ret = _send( data , len ); + if ( ret == -1 ) { + +#ifdef MONGO_SSL + if ( _ssl ) { + log() << "SSL Error ret: " << ret << " err: " << SSL_get_error( _ssl.get() , ret ) + << " " << ERR_error_string(ERR_get_error(), NULL) + << endl; + } +#endif + +#if defined(_WIN32) + if ( WSAGetLastError() == WSAETIMEDOUT && _timeout != 0 ) { +#else + if ( ( errno == EAGAIN || errno == EWOULDBLOCK ) && _timeout != 0 ) { +#endif + log(_logLevel) << "Socket " << context << " send() timed out " << _remote.toString() << endl; + throw SocketException( SocketException::SEND_TIMEOUT , remoteString() ); + } + else { + SocketException::Type t = SocketException::SEND_ERROR; + log(_logLevel) << "Socket " << context << " send() " + << errnoWithDescription() << ' ' << remoteString() << endl; + throw SocketException( t , remoteString() ); + } + } + else { + _bytesOut += ret; + + assert( ret <= len ); + len -= ret; + data += ret; + } + } + } + + void Socket::_send( const vector< pair< char *, int > > &data, const char *context ) { + for( vector< pair< char *, int > >::const_iterator i = data.begin(); i != data.end(); ++i ) { + char * data = i->first; + int len = i->second; + send( data, len, context ); + } + } + + // sends all data or throws an exception + void Socket::send( const vector< pair< char *, int > > &data, const char *context ) { + +#ifdef MONGO_SSL + if ( _ssl ) { + _send( data , context ); + return; + } +#endif + +#if defined(_WIN32) + // TODO use scatter/gather api + _send( data , context ); +#else + vector< struct iovec > d( data.size() ); + int i = 0; + for( vector< pair< char *, int > >::const_iterator j = data.begin(); j != data.end(); ++j ) { + if ( j->second > 0 ) { + d[ i ].iov_base = j->first; + d[ i ].iov_len = j->second; + ++i; + _bytesOut += j->second; + } + } + struct msghdr meta; + memset( &meta, 0, sizeof( meta ) ); + meta.msg_iov = &d[ 0 ]; + meta.msg_iovlen = d.size(); + + while( meta.msg_iovlen > 0 ) { + int ret = ::sendmsg( _fd , &meta , portSendFlags ); + if ( ret == -1 ) { + if ( errno != EAGAIN || _timeout == 0 ) { + log(_logLevel) << "Socket " << context << " send() " << errnoWithDescription() << ' ' << remoteString() << endl; + throw SocketException( SocketException::SEND_ERROR , remoteString() ); + } + else { + log(_logLevel) << "Socket " << context << " send() remote timeout " << remoteString() << endl; + throw SocketException( SocketException::SEND_TIMEOUT , remoteString() ); + } + } + else { + struct iovec *& i = meta.msg_iov; + while( ret > 0 ) { + if ( i->iov_len > unsigned( ret ) ) { + i->iov_len -= ret; + i->iov_base = (char*)(i->iov_base) + ret; + ret = 0; + } + else { + ret -= i->iov_len; + ++i; + --(meta.msg_iovlen); + } + } + } + } +#endif + } + + void Socket::recv( char * buf , int len ) { + unsigned retries = 0; + while( len > 0 ) { + int ret = unsafe_recv( buf , len ); + if ( ret > 0 ) { + if ( len <= 4 && ret != len ) + log(_logLevel) << "Socket recv() got " << ret << " bytes wanted len=" << len << endl; + assert( ret <= len ); + len -= ret; + buf += ret; + } + else if ( ret == 0 ) { + log(3) << "Socket recv() conn closed? " << remoteString() << endl; + throw SocketException( SocketException::CLOSED , remoteString() ); + } + else { /* ret < 0 */ +#if defined(_WIN32) + int e = WSAGetLastError(); +#else + int e = errno; +# if defined(EINTR) + if( e == EINTR ) { + if( ++retries == 1 ) { + log() << "EINTR retry" << endl; + continue; + } + } +# endif +#endif + if ( ( e == EAGAIN +#if defined(_WIN32) + || e == WSAETIMEDOUT +#endif + ) && _timeout > 0 ) + { + // this is a timeout + log(_logLevel) << "Socket recv() timeout " << remoteString() <<endl; + throw SocketException( SocketException::RECV_TIMEOUT, remoteString() ); + } + + log(_logLevel) << "Socket recv() " << errnoWithDescription(e) << " " << remoteString() <<endl; + throw SocketException( SocketException::RECV_ERROR , remoteString() ); + } + } + } + + int Socket::unsafe_recv( char *buf, int max ) { + int x = _recv( buf , max ); + _bytesIn += x; + return x; + } + + + int Socket::_recv( char *buf, int max ) { +#ifdef MONGO_SSL + if ( _ssl ){ + return SSL_read( _ssl.get() , buf , max ); + } +#endif + return ::recv( _fd , buf , max , portRecvFlags ); + } + + void Socket::setTimeout( double secs ) { + struct timeval tv; + tv.tv_sec = (int)secs; + tv.tv_usec = (int)((long long)(secs*1000*1000) % (1000*1000)); + bool report = logLevel > 3; // solaris doesn't provide these + DEV report = true; + bool ok = setsockopt(_fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv) ) == 0; + if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; + ok = setsockopt(_fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof(tv) ) == 0; + DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; + } + +#if defined(_WIN32) + struct WinsockInit { + WinsockInit() { + WSADATA d; + if ( WSAStartup(MAKEWORD(2,2), &d) != 0 ) { + out() << "ERROR: wsastartup failed " << errnoWithDescription() << endl; + problem() << "ERROR: wsastartup failed " << errnoWithDescription() << endl; + dbexit( EXIT_NTSERVICE_ERROR ); + } + } + } winsock_init; +#endif + +} // namespace mongo diff --git a/util/net/sock.h b/util/net/sock.h new file mode 100644 index 0000000..1cd5133 --- /dev/null +++ b/util/net/sock.h @@ -0,0 +1,256 @@ +// @file sock.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../../pch.h" + +#include <stdio.h> +#include <sstream> +#include "../goodies.h" +#include "../../db/cmdline.h" +#include "../mongoutils/str.h" + +#ifndef _WIN32 + +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <errno.h> + +#ifdef __openbsd__ +# include <sys/uio.h> +#endif + +#endif // _WIN32 + +#ifdef MONGO_SSL +#include <openssl/ssl.h> +#endif + +namespace mongo { + + const int SOCK_FAMILY_UNKNOWN_ERROR=13078; + + void disableNagle(int sock); + +#if defined(_WIN32) + + typedef short sa_family_t; + typedef int socklen_t; + + // This won't actually be used on windows + struct sockaddr_un { + short sun_family; + char sun_path[108]; // length from unix header + }; + +#else // _WIN32 + + inline void closesocket(int s) { close(s); } + const int INVALID_SOCKET = -1; + typedef int SOCKET; + +#endif // _WIN32 + + inline string makeUnixSockPath(int port) { + return mongoutils::str::stream() << cmdLine.socket << "/mongodb-" << port << ".sock"; + } + + // If an ip address is passed in, just return that. If a hostname is passed + // in, look up its ip and return that. Returns "" on failure. + string hostbyname(const char *hostname); + + void enableIPv6(bool state=true); + bool IPv6Enabled(); + void setSockTimeouts(int sock, double secs); + + /** + * wrapped around os representation of network address + */ + struct SockAddr { + SockAddr() { + addressSize = sizeof(sa); + memset(&sa, 0, sizeof(sa)); + sa.ss_family = AF_UNSPEC; + } + SockAddr(int sourcePort); /* listener side */ + SockAddr(const char *ip, int port); /* EndPoint (remote) side, or if you want to specify which interface locally */ + + template <typename T> T& as() { return *(T*)(&sa); } + template <typename T> const T& as() const { return *(const T*)(&sa); } + + string toString(bool includePort=true) const; + + /** + * @return one of AF_INET, AF_INET6, or AF_UNIX + */ + sa_family_t getType() const; + + unsigned getPort() const; + + string getAddr() const; + + bool isLocalHost() const; + + bool operator==(const SockAddr& r) const; + + bool operator!=(const SockAddr& r) const; + + bool operator<(const SockAddr& r) const; + + const sockaddr* raw() const {return (sockaddr*)&sa;} + sockaddr* raw() {return (sockaddr*)&sa;} + + socklen_t addressSize; + private: + struct sockaddr_storage sa; + }; + + extern SockAddr unknownAddress; // ( "0.0.0.0", 0 ) + + /** this is not cache and does a syscall */ + string getHostName(); + + /** this is cached, so if changes during the process lifetime + * will be stale */ + string getHostNameCached(); + + /** + * thrown by Socket and SockAddr + */ + class SocketException : public DBException { + public: + const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type; + + SocketException( Type t , string server , int code = 9001 , string extra="" ) + : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ } + virtual ~SocketException() throw() {} + + bool shouldPrint() const { return _type != CLOSED; } + virtual string toString() const; + + private: + string _server; + string _extra; + }; + +#ifdef MONGO_SSL + class SSLManager : boost::noncopyable { + public: + SSLManager( bool client ); + + void setupPEM( const string& keyFile , const string& password ); + void setupPubPriv( const string& privateKeyFile , const string& publicKeyFile ); + + /** + * creates an SSL context to be used for this file descriptor + * caller should delete + */ + SSL * secure( int fd ); + + static int password_cb( char *buf,int num, int rwflag,void *userdata ); + + private: + bool _client; + SSL_CTX* _context; + string _password; + }; +#endif + + /** + * thin wrapped around file descriptor and system calls + * todo: ssl + */ + class Socket { + public: + Socket(int sock, const SockAddr& farEnd); + + /** In some cases the timeout will actually be 2x this value - eg we do a partial send, + then the timeout fires, then we try to send again, then the timeout fires again with + no data sent, then we detect that the other side is down. + + Generally you don't want a timeout, you should be very prepared for errors if you set one. + */ + Socket(double so_timeout = 0, int logLevel = 0 ); + + bool connect(SockAddr& farEnd); + void close(); + + void send( const char * data , int len, const char *context ); + void send( const vector< pair< char *, int > > &data, const char *context ); + + // recv len or throw SocketException + void recv( char * data , int len ); + int unsafe_recv( char *buf, int max ); + + int getLogLevel() const { return _logLevel; } + void setLogLevel( int ll ) { _logLevel = ll; } + + SockAddr remoteAddr() const { return _remote; } + string remoteString() const { return _remote.toString(); } + unsigned remotePort() const { return _remote.getPort(); } + + void clearCounters() { _bytesIn = 0; _bytesOut = 0; } + long long getBytesIn() const { return _bytesIn; } + long long getBytesOut() const { return _bytesOut; } + + void setTimeout( double secs ); + +#ifdef MONGO_SSL + /** secures inline */ + void secure( SSLManager * ssl ); + + void secureAccepted( SSLManager * ssl ); +#endif + + /** + * call this after a fork for server sockets + */ + void postFork(); + + private: + void _init(); + /** raw send, same semantics as ::send */ + int _send( const char * data , int len ); + + /** sends dumbly, just each buffer at a time */ + void _send( const vector< pair< char *, int > > &data, const char *context ); + + /** raw recv, same semantics as ::recv */ + int _recv( char * buf , int max ); + + int _fd; + SockAddr _remote; + double _timeout; + + long long _bytesIn; + long long _bytesOut; + +#ifdef MONGO_SSL + shared_ptr<SSL> _ssl; + SSLManager * _sslAccepted; +#endif + + protected: + int _logLevel; // passed to log() when logging errors + + }; + + +} // namespace mongo diff --git a/util/optime.h b/util/optime.h index 7e6be4d..9f78fda 100644 --- a/util/optime.h +++ b/util/optime.h @@ -26,7 +26,7 @@ namespace mongo { ClockSkewException() : DBException( "clock skew exception" , 20001 ) {} }; - /* replsets use RSOpTime. + /* replsets used to use RSOpTime. M/S uses OpTime. But this is useable from both. */ @@ -36,9 +36,10 @@ namespace mongo { */ #pragma pack(4) class OpTime { - unsigned i; + unsigned i; // ordinal comes first so we can do a single 64 bit compare on little endian unsigned secs; static OpTime last; + static OpTime skewed(); public: static void setLast(const Date_t &date) { last = OpTime(date); @@ -46,47 +47,48 @@ namespace mongo { unsigned getSecs() const { return secs; } + unsigned getInc() const { + return i; + } OpTime(Date_t date) { reinterpret_cast<unsigned long long&>(*this) = date.millis; + dassert( (int)secs >= 0 ); } OpTime(ReplTime x) { reinterpret_cast<unsigned long long&>(*this) = x; + dassert( (int)secs >= 0 ); } OpTime(unsigned a, unsigned b) { secs = a; i = b; + dassert( (int)secs >= 0 ); } OpTime( const OpTime& other ) { secs = other.secs; i = other.i; + dassert( (int)secs >= 0 ); } OpTime() { secs = 0; i = 0; } - static OpTime now() { + // it isn't generally safe to not be locked for this. so use now(). some tests use this. + static OpTime now_inlock() { unsigned t = (unsigned) time(0); - if ( t < last.secs ) { - bool toLog = false; - ONCE toLog = true; - RARELY toLog = true; - if ( last.i & 0x80000000 ) - toLog = true; - if ( toLog ) - log() << "clock skew detected prev: " << last.secs << " now: " << t << " trying to handle..." << endl; - if ( last.i & 0x80000000 ) { - log() << "ERROR Large clock skew detected, shutting down" << endl; - throw ClockSkewException(); - } - t = last.secs; - } if ( last.secs == t ) { last.i++; return last; } + if ( t < last.secs ) { + return skewed(); // separate function to keep out of the hot code path + } last = OpTime(t, 1); return last; } + static OpTime now() { + DEV dbMutex.assertWriteLocked(); + return now_inlock(); + } /* We store OpTime's in the database as BSON Date datatype -- we needed some sort of 64 bit "container" for these values. While these are not really "Dates", that seems a diff --git a/util/paths.h b/util/paths.h index ce0a378..2297a9a 100644 --- a/util/paths.h +++ b/util/paths.h @@ -19,10 +19,13 @@ #pragma once #include "mongoutils/str.h" - -using namespace mongoutils; +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> namespace mongo { + + using namespace mongoutils; extern string dbpath; @@ -76,4 +79,39 @@ namespace mongo { }; + inline dev_t getPartition(const string& path){ + struct stat stats; + + if (stat(path.c_str(), &stats) != 0){ + uasserted(13646, str::stream() << "stat() failed for file: " << path << " " << errnoWithDescription()); + } + + return stats.st_dev; + } + + inline bool onSamePartition(const string& path1, const string& path2){ + dev_t dev1 = getPartition(path1); + dev_t dev2 = getPartition(path2); + + return dev1 == dev2; + } + + inline void flushMyDirectory(const boost::filesystem::path& file){ +#ifdef __linux__ // this isn't needed elsewhere + massert(13652, str::stream() << "Couldn't find parent dir for file: " << file.string(), file.has_branch_path()); + boost::filesystem::path dir = file.branch_path(); // parent_path in new boosts + + log(1) << "flushing directory " << dir.string() << endl; + + int fd = ::open(dir.string().c_str(), O_RDONLY); // DO NOT THROW OR ASSERT BEFORE CLOSING + massert(13650, str::stream() << "Couldn't open directory '" << dir.string() << "' for flushing: " << errnoWithDescription(), fd >= 0); + if (fsync(fd) != 0){ + int e = errno; + close(fd); + massert(13651, str::stream() << "Couldn't fsync directory '" << dir.string() << "': " << errnoWithDescription(e), false); + } + close(fd); +#endif + } + } diff --git a/util/processinfo.h b/util/processinfo.h index b10e6fe..5272831 100644 --- a/util/processinfo.h +++ b/util/processinfo.h @@ -53,8 +53,8 @@ namespace mongo { bool supported(); - bool blockCheckSupported(); - bool blockInMemory( char * start ); + static bool blockCheckSupported(); + static bool blockInMemory( char * start ); private: pid_t _pid; @@ -62,6 +62,6 @@ namespace mongo { void writePidFile( const std::string& path ); - void printMemInfo( const char * where ); + void printMemInfo( const char * whereContextStr = 0 ); } diff --git a/util/processinfo_darwin.cpp b/util/processinfo_darwin.cpp index c1190ae..9f73cbf 100644 --- a/util/processinfo_darwin.cpp +++ b/util/processinfo_darwin.cpp @@ -19,15 +19,14 @@ #include "processinfo.h" #include "log.h" - +#include <mach/vm_statistics.h> #include <mach/task_info.h> - #include <mach/mach_init.h> #include <mach/mach_host.h> #include <mach/mach_traps.h> #include <mach/task.h> #include <mach/vm_map.h> -#include <mach/shared_memory_server.h> +#include <mach/shared_region.h> #include <iostream> #include <sys/types.h> diff --git a/util/processinfo_win32.cpp b/util/processinfo_win32.cpp index d62b21b..ec66aec 100644 --- a/util/processinfo_win32.cpp +++ b/util/processinfo_win32.cpp @@ -17,10 +17,7 @@ #include "pch.h" #include "processinfo.h" - #include <iostream> - -#include <windows.h> #include <psapi.h> using namespace std; diff --git a/util/queue.h b/util/queue.h index 6a1e33a..4223bd6 100644 --- a/util/queue.h +++ b/util/queue.h @@ -43,6 +43,12 @@ namespace mongo { return _queue.empty(); } + size_t size() const { + scoped_lock l( _lock ); + return _queue.size(); + } + + bool tryPop( T & t ) { scoped_lock l( _lock ); if ( _queue.empty() ) diff --git a/util/ramlog.cpp b/util/ramlog.cpp new file mode 100644 index 0000000..69ffc17 --- /dev/null +++ b/util/ramlog.cpp @@ -0,0 +1,190 @@ +// ramlog.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "log.h" +#include "ramlog.h" +#include "mongoutils/html.h" +#include "mongoutils/str.h" + +namespace mongo { + + using namespace mongoutils; + + RamLog::RamLog( string name ) : _name(name), _lastWrite(0) { + h = 0; n = 0; + for( int i = 0; i < N; i++ ) + lines[i][C-1] = 0; + + if ( name.size() ) { + + if ( ! _namedLock ) + _namedLock = new mongo::mutex("RamLog::_namedLock"); + + scoped_lock lk( *_namedLock ); + if ( ! _named ) + _named = new RM(); + (*_named)[name] = this; + } + + } + + RamLog::~RamLog() { + + } + + void RamLog::write(LogLevel ll, const string& str) { + _lastWrite = time(0); + + char *p = lines[(h+n)%N]; + + unsigned sz = str.size(); + if( sz < C ) { + if ( str.c_str()[sz-1] == '\n' ) { + memcpy(p, str.c_str(), sz-1); + p[sz-1] = 0; + } + else + strcpy(p, str.c_str()); + } + else { + memcpy(p, str.c_str(), C-1); + } + + if( n < N ) n++; + else h = (h+1) % N; + } + + void RamLog::get( vector<const char*>& v) const { + for( unsigned x=0, i=h; x++ < n; i=(i+1)%N ) + v.push_back(lines[i]); + } + + int RamLog::repeats(const vector<const char *>& v, int i) { + for( int j = i-1; j >= 0 && j+8 > i; j-- ) { + if( strcmp(v[i]+20,v[j]+20) == 0 ) { + for( int x = 1; ; x++ ) { + if( j+x == i ) return j; + if( i+x>=(int) v.size() ) return -1; + if( strcmp(v[i+x]+20,v[j+x]+20) ) return -1; + } + return -1; + } + } + return -1; + } + + + string RamLog::clean(const vector<const char *>& v, int i, string line ) { + if( line.empty() ) line = v[i]; + if( i > 0 && strncmp(v[i], v[i-1], 11) == 0 ) + return string(" ") + line.substr(11); + return v[i]; + } + + string RamLog::color(string line) { + string s = str::after(line, "replSet "); + if( str::startsWith(s, "warning") || startsWith(s, "error") ) + return html::red(line); + if( str::startsWith(s, "info") ) { + if( str::endsWith(s, " up\n") ) + return html::green(line); + else if( str::contains(s, " down ") || str::endsWith(s, " down\n") ) + return html::yellow(line); + return line; //html::blue(line); + } + + return line; + } + + /* turn http:... into an anchor */ + string RamLog::linkify(const char *s) { + const char *p = s; + const char *h = strstr(p, "http://"); + if( h == 0 ) return s; + + const char *sp = h + 7; + while( *sp && *sp != ' ' ) sp++; + + string url(h, sp-h); + stringstream ss; + ss << string(s, h-s) << "<a href=\"" << url << "\">" << url << "</a>" << sp; + return ss.str(); + } + + void RamLog::toHTML(stringstream& s) { + vector<const char*> v; + get( v ); + + s << "<pre>\n"; + for( int i = 0; i < (int)v.size(); i++ ) { + assert( strlen(v[i]) > 20 ); + int r = repeats(v, i); + if( r < 0 ) { + s << color( linkify( clean(v,i).c_str() ) ); + } + else { + stringstream x; + x << string(v[i], 0, 20); + int nr = (i-r); + int last = i+nr-1; + for( ; r < i ; r++ ) x << '.'; + if( 1 ) { + stringstream r; + if( nr == 1 ) r << "repeat last line"; + else r << "repeats last " << nr << " lines; ends " << string(v[last]+4,0,15); + s << html::a("", r.str(), clean(v,i,x.str())); + } + else s << x.str(); + s << '\n'; + i = last; + } + } + s << "</pre>\n"; + } + + // --------------- + // static things + // --------------- + + RamLog* RamLog::get( string name ) { + if ( ! _named ) + return 0; + + scoped_lock lk( *_namedLock ); + RM::iterator i = _named->find( name ); + if ( i == _named->end() ) + return 0; + return i->second; + } + + void RamLog::getNames( vector<string>& names ) { + if ( ! _named ) + return; + + scoped_lock lk( *_namedLock ); + for ( RM::iterator i=_named->begin(); i!=_named->end(); ++i ) { + if ( i->second->n ) + names.push_back( i->first ); + } + } + + mongo::mutex* RamLog::_namedLock; + RamLog::RM* RamLog::_named = 0; + + Tee* const warnings = new RamLog("warnings"); // Things put here go in serverStatus +} diff --git a/util/ramlog.h b/util/ramlog.h index b2f3aa0..d3d5c8f 100644 --- a/util/ramlog.h +++ b/util/ramlog.h @@ -1,4 +1,4 @@ -// log.h +// ramlog.h /* Copyright 2009 10gen Inc. * @@ -18,124 +18,48 @@ #pragma once #include "log.h" -#include "mongoutils/html.h" namespace mongo { class RamLog : public Tee { - enum { - N = 128, - C = 256 - }; - char lines[N][C]; - unsigned h, n; - public: - RamLog() { - h = 0; n = 0; - for( int i = 0; i < N; i++ ) - lines[i][C-1] = 0; - } - - virtual void write(LogLevel ll, const string& str) { - char *p = lines[(h+n)%N]; - if( str.size() < C ) - strcpy(p, str.c_str()); - else - memcpy(p, str.c_str(), C-1); - if( n < N ) n++; - else h = (h+1) % N; - } - - void get( vector<const char*>& v) const { - for( unsigned x=0, i=h; x++ < n; i=(i+1)%N ) - v.push_back(lines[i]); - } - - static int repeats(const vector<const char *>& v, int i) { - for( int j = i-1; j >= 0 && j+8 > i; j-- ) { - if( strcmp(v[i]+20,v[j]+20) == 0 ) { - for( int x = 1; ; x++ ) { - if( j+x == i ) return j; - if( i+x>=(int) v.size() ) return -1; - if( strcmp(v[i+x]+20,v[j+x]+20) ) return -1; - } - return -1; - } - } - return -1; - } - - - static string clean(const vector<const char *>& v, int i, string line="") { - if( line.empty() ) line = v[i]; - if( i > 0 && strncmp(v[i], v[i-1], 11) == 0 ) - return string(" ") + line.substr(11); - return v[i]; - } - - static string color(string line) { - string s = str::after(line, "replSet "); - if( str::startsWith(s, "warning") || startsWith(s, "error") ) - return html::red(line); - if( str::startsWith(s, "info") ) { - if( str::endsWith(s, " up\n") ) - return html::green(line); - else if( str::contains(s, " down ") || str::endsWith(s, " down\n") ) - return html::yellow(line); - return line; //html::blue(line); - } - - return line; - } + RamLog( string name ); + + virtual void write(LogLevel ll, const string& str); + + void get( vector<const char*>& v) const; + + void toHTML(stringstream& s); + + static RamLog* get( string name ); + static void getNames( vector<string>& names ); + + time_t lastWrite() { return _lastWrite; } // 0 if no writes + + protected: + static int repeats(const vector<const char *>& v, int i); + static string clean(const vector<const char *>& v, int i, string line=""); + static string color(string line); /* turn http:... into an anchor */ - string linkify(const char *s) { - const char *p = s; - const char *h = strstr(p, "http://"); - if( h == 0 ) return s; - - const char *sp = h + 7; - while( *sp && *sp != ' ' ) sp++; - - string url(h, sp-h); - stringstream ss; - ss << string(s, h-s) << "<a href=\"" << url << "\">" << url << "</a>" << sp; - return ss.str(); - } - - void toHTML(stringstream& s) { - vector<const char*> v; - get( v ); - - s << "<pre>\n"; - for( int i = 0; i < (int)v.size(); i++ ) { - assert( strlen(v[i]) > 20 ); - int r = repeats(v, i); - if( r < 0 ) { - s << color( linkify( clean(v,i).c_str() ) ); - } - else { - stringstream x; - x << string(v[i], 0, 20); - int nr = (i-r); - int last = i+nr-1; - for( ; r < i ; r++ ) x << '.'; - if( 1 ) { - stringstream r; - if( nr == 1 ) r << "repeat last line"; - else r << "repeats last " << nr << " lines; ends " << string(v[last]+4,0,15); - s << html::a("", r.str(), clean(v,i,x.str())); - } - else s << x.str(); - s << '\n'; - i = last; - } - } - s << "</pre>\n"; - } + static string linkify(const char *s); + private: + ~RamLog(); // want this private as we want to leak so we can use them till the very end + enum { + N = 128, // number of links + C = 256 // max size of line + }; + char lines[N][C]; + unsigned h; // current position + unsigned n; // numer of lines stores 0 o N + string _name; + + typedef map<string,RamLog*> RM; + static mongo::mutex* _namedLock; + static RM* _named; + time_t _lastWrite; }; } diff --git a/util/sock.cpp b/util/sock.cpp deleted file mode 100644 index ef3ed0e..0000000 --- a/util/sock.cpp +++ /dev/null @@ -1,235 +0,0 @@ -// @file sock.cpp - -/* Copyright 2009 10gen Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "pch.h" -#include "sock.h" - -namespace mongo { - - static mongo::mutex sock_mutex("sock_mutex"); - - static bool ipv6 = false; - void enableIPv6(bool state) { ipv6 = state; } - bool IPv6Enabled() { return ipv6; } - - string getAddrInfoStrError(int code) { -#if !defined(_WIN32) - return gai_strerror(code); -#else - /* gai_strerrorA is not threadsafe on windows. don't use it. */ - return errnoWithDescription(code); -#endif - } - - SockAddr::SockAddr(int sourcePort) { - memset(as<sockaddr_in>().sin_zero, 0, sizeof(as<sockaddr_in>().sin_zero)); - as<sockaddr_in>().sin_family = AF_INET; - as<sockaddr_in>().sin_port = htons(sourcePort); - as<sockaddr_in>().sin_addr.s_addr = htonl(INADDR_ANY); - addressSize = sizeof(sockaddr_in); - } - - SockAddr::SockAddr(const char * iporhost , int port) { - if (!strcmp(iporhost, "localhost")) - iporhost = "127.0.0.1"; - - if (strchr(iporhost, '/')) { -#ifdef _WIN32 - uassert(13080, "no unix socket support on windows", false); -#endif - uassert(13079, "path to unix socket too long", strlen(iporhost) < sizeof(as<sockaddr_un>().sun_path)); - as<sockaddr_un>().sun_family = AF_UNIX; - strcpy(as<sockaddr_un>().sun_path, iporhost); - addressSize = sizeof(sockaddr_un); - } - else { - addrinfo* addrs = NULL; - addrinfo hints; - memset(&hints, 0, sizeof(addrinfo)); - hints.ai_socktype = SOCK_STREAM; - //hints.ai_flags = AI_ADDRCONFIG; // This is often recommended but don't do it. SERVER-1579 - hints.ai_flags |= AI_NUMERICHOST; // first pass tries w/o DNS lookup - hints.ai_family = (IPv6Enabled() ? AF_UNSPEC : AF_INET); - - stringstream ss; - ss << port; - int ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs); - - // old C compilers on IPv6-capable hosts return EAI_NODATA error -#ifdef EAI_NODATA - int nodata = (ret == EAI_NODATA); -#else - int nodata = false; -#endif - if (ret == EAI_NONAME || nodata) { - // iporhost isn't an IP address, allow DNS lookup - hints.ai_flags &= ~AI_NUMERICHOST; - ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs); - } - - if (ret) { - log() << "getaddrinfo(\"" << iporhost << "\") failed: " << gai_strerror(ret) << endl; - *this = SockAddr(port); - } - else { - //TODO: handle other addresses in linked list; - assert(addrs->ai_addrlen <= sizeof(sa)); - memcpy(&sa, addrs->ai_addr, addrs->ai_addrlen); - addressSize = addrs->ai_addrlen; - freeaddrinfo(addrs); - } - } - } - - bool SockAddr::isLocalHost() const { - switch (getType()) { - case AF_INET: return getAddr() == "127.0.0.1"; - case AF_INET6: return getAddr() == "::1"; - case AF_UNIX: return true; - default: return false; - } - assert(false); - return false; - } - - string hostbyname(const char *hostname) { - string addr = SockAddr(hostname, 0).getAddr(); - if (addr == "0.0.0.0") - return ""; - else - return addr; - } - - class UDPConnection { - public: - UDPConnection() { - sock = 0; - } - ~UDPConnection() { - if ( sock ) { - closesocket(sock); - sock = 0; - } - } - bool init(const SockAddr& myAddr); - int recvfrom(char *buf, int len, SockAddr& sender); - int sendto(char *buf, int len, const SockAddr& EndPoint); - int mtu(const SockAddr& sa) { - return sa.isLocalHost() ? 16384 : 1480; - } - - SOCKET sock; - }; - - inline int UDPConnection::recvfrom(char *buf, int len, SockAddr& sender) { - return ::recvfrom(sock, buf, len, 0, sender.raw(), &sender.addressSize); - } - - inline int UDPConnection::sendto(char *buf, int len, const SockAddr& EndPoint) { - if ( 0 && rand() < (RAND_MAX>>4) ) { - out() << " NOTSENT "; - return 0; - } - return ::sendto(sock, buf, len, 0, EndPoint.raw(), EndPoint.addressSize); - } - - inline bool UDPConnection::init(const SockAddr& myAddr) { - sock = socket(myAddr.getType(), SOCK_DGRAM, IPPROTO_UDP); - if ( sock == INVALID_SOCKET ) { - out() << "invalid socket? " << errnoWithDescription() << endl; - return false; - } - if ( ::bind(sock, myAddr.raw(), myAddr.addressSize) != 0 ) { - out() << "udp init failed" << endl; - closesocket(sock); - sock = 0; - return false; - } - socklen_t optLen; - int rcvbuf; - if (getsockopt(sock, - SOL_SOCKET, - SO_RCVBUF, - (char*)&rcvbuf, - &optLen) != -1) - out() << "SO_RCVBUF:" << rcvbuf << endl; - return true; - } - - void sendtest() { - out() << "sendtest\n"; - SockAddr me(27016); - SockAddr dest("127.0.0.1", 27015); - UDPConnection c; - if ( c.init(me) ) { - char buf[256]; - out() << "sendto: "; - out() << c.sendto(buf, sizeof(buf), dest) << " " << errnoWithDescription() << endl; - } - out() << "end\n"; - } - - void listentest() { - out() << "listentest\n"; - SockAddr me(27015); - SockAddr sender; - UDPConnection c; - if ( c.init(me) ) { - char buf[256]; - out() << "recvfrom: "; - out() << c.recvfrom(buf, sizeof(buf), sender) << " " << errnoWithDescription() << endl; - } - out() << "end listentest\n"; - } - - void xmain(); - -#if defined(_WIN32) - namespace { - struct WinsockInit { - WinsockInit() { - WSADATA d; - if ( WSAStartup(MAKEWORD(2,2), &d) != 0 ) { - out() << "ERROR: wsastartup failed " << errnoWithDescription() << endl; - problem() << "ERROR: wsastartup failed " << errnoWithDescription() << endl; - dbexit( EXIT_NTSERVICE_ERROR ); - } - } - } winsock_init; - } -#endif - - SockAddr unknownAddress( "0.0.0.0", 0 ); - - ListeningSockets* ListeningSockets::_instance = new ListeningSockets(); - - ListeningSockets* ListeningSockets::get() { - return _instance; - } - - string _hostNameCached; - static void _hostNameCachedInit() { - _hostNameCached = getHostName(); - } - boost::once_flag _hostNameCachedInitFlags = BOOST_ONCE_INIT; - - string getHostNameCached() { - boost::call_once( _hostNameCachedInit , _hostNameCachedInitFlags ); - return _hostNameCached; - } - -} // namespace mongo diff --git a/util/sock.h b/util/sock.h deleted file mode 100644 index 54dfb49..0000000 --- a/util/sock.h +++ /dev/null @@ -1,303 +0,0 @@ -// @file sock.h - -/* Copyright 2009 10gen Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "../pch.h" - -#include <stdio.h> -#include <sstream> -#include "goodies.h" -#include "../db/jsobj.h" -#include "../db/cmdline.h" - -namespace mongo { - - const int SOCK_FAMILY_UNKNOWN_ERROR=13078; - string getAddrInfoStrError(int code); - -#if defined(_WIN32) - - typedef short sa_family_t; - typedef int socklen_t; - inline int getLastError() { return WSAGetLastError(); } - inline void disableNagle(int sock) { - int x = 1; - if ( setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &x, sizeof(x)) ) - out() << "ERROR: disableNagle failed" << endl; - if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) ) - out() << "ERROR: SO_KEEPALIVE failed" << endl; - } - inline void prebindOptions( int sock ) { } - - // This won't actually be used on windows - struct sockaddr_un { - short sun_family; - char sun_path[108]; // length from unix header - }; - -#else - - extern CmdLine cmdLine; - -} // namespace mongo - -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/un.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <errno.h> -#include <netdb.h> -#ifdef __openbsd__ -# include <sys/uio.h> -#endif - -#ifndef AI_ADDRCONFIG -# define AI_ADDRCONFIG 0 -#endif - -namespace mongo { - - inline void closesocket(int s) { - close(s); - } - const int INVALID_SOCKET = -1; - typedef int SOCKET; - - inline void disableNagle(int sock) { - int x = 1; - -#ifdef SOL_TCP - int level = SOL_TCP; -#else - int level = SOL_SOCKET; -#endif - - if ( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) ) - log() << "ERROR: disableNagle failed: " << errnoWithDescription() << endl; - -#ifdef SO_KEEPALIVE - if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) ) - log() << "ERROR: SO_KEEPALIVE failed: " << errnoWithDescription() << endl; -#endif - - } - inline void prebindOptions( int sock ) { - DEV log() << "doing prebind option" << endl; - int x = 1; - if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &x, sizeof(x)) < 0 ) - out() << "Failed to set socket opt, SO_REUSEADDR" << endl; - } - - -#endif - - inline string makeUnixSockPath(int port) { - return cmdLine.socket + "/mongodb-" + BSONObjBuilder::numStr(port) + ".sock"; - } - - inline void setSockTimeouts(int sock, double secs) { - struct timeval tv; - tv.tv_sec = (int)secs; - tv.tv_usec = (int)((long long)(secs*1000*1000) % (1000*1000)); - bool report = logLevel > 3; // solaris doesn't provide these - DEV report = true; - bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv) ) == 0; - if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; - ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof(tv) ) == 0; - DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; - } - - // If an ip address is passed in, just return that. If a hostname is passed - // in, look up its ip and return that. Returns "" on failure. - string hostbyname(const char *hostname); - - void enableIPv6(bool state=true); - bool IPv6Enabled(); - - struct SockAddr { - SockAddr() { - addressSize = sizeof(sa); - memset(&sa, 0, sizeof(sa)); - sa.ss_family = AF_UNSPEC; - } - SockAddr(int sourcePort); /* listener side */ - SockAddr(const char *ip, int port); /* EndPoint (remote) side, or if you want to specify which interface locally */ - - template <typename T> - T& as() { return *(T*)(&sa); } - template <typename T> - const T& as() const { return *(const T*)(&sa); } - - string toString(bool includePort=true) const { - string out = getAddr(); - if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC) - out += ':' + BSONObjBuilder::numStr(getPort()); - return out; - } - - // returns one of AF_INET, AF_INET6, or AF_UNIX - sa_family_t getType() const { - return sa.ss_family; - } - - unsigned getPort() const { - switch (getType()) { - case AF_INET: return ntohs(as<sockaddr_in>().sin_port); - case AF_INET6: return ntohs(as<sockaddr_in6>().sin6_port); - case AF_UNIX: return 0; - case AF_UNSPEC: return 0; - default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return 0; - } - } - - string getAddr() const { - switch (getType()) { - case AF_INET: - case AF_INET6: { - const int buflen=128; - char buffer[buflen]; - int ret = getnameinfo(raw(), addressSize, buffer, buflen, NULL, 0, NI_NUMERICHOST); - massert(13082, getAddrInfoStrError(ret), ret == 0); - return buffer; - } - - case AF_UNIX: return (addressSize > 2 ? as<sockaddr_un>().sun_path : "anonymous unix socket"); - case AF_UNSPEC: return "(NONE)"; - default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return ""; - } - } - - bool isLocalHost() const; - - bool operator==(const SockAddr& r) const { - if (getType() != r.getType()) - return false; - - if (getPort() != r.getPort()) - return false; - - switch (getType()) { - case AF_INET: return as<sockaddr_in>().sin_addr.s_addr == r.as<sockaddr_in>().sin_addr.s_addr; - case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) == 0; - case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) == 0; - case AF_UNSPEC: return true; // assume all unspecified addresses are the same - default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return false; - } - } - bool operator!=(const SockAddr& r) const { - return !(*this == r); - } - bool operator<(const SockAddr& r) const { - if (getType() < r.getType()) - return true; - else if (getType() > r.getType()) - return false; - - if (getPort() < r.getPort()) - return true; - else if (getPort() > r.getPort()) - return false; - - switch (getType()) { - case AF_INET: return as<sockaddr_in>().sin_addr.s_addr < r.as<sockaddr_in>().sin_addr.s_addr; - case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) < 0; - case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) < 0; - case AF_UNSPEC: return false; - default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return false; - } - } - - const sockaddr* raw() const {return (sockaddr*)&sa;} - sockaddr* raw() {return (sockaddr*)&sa;} - - socklen_t addressSize; - private: - struct sockaddr_storage sa; - }; - - extern SockAddr unknownAddress; // ( "0.0.0.0", 0 ) - - const int MaxMTU = 16384; - - inline string getHostName() { - char buf[256]; - int ec = gethostname(buf, 127); - if ( ec || *buf == 0 ) { - log() << "can't get this server's hostname " << errnoWithDescription() << endl; - return ""; - } - return buf; - } - - string getHostNameCached(); - - class ListeningSockets { - public: - ListeningSockets() - : _mutex("ListeningSockets") - , _sockets( new set<int>() ) - , _socketPaths( new set<string>() ) - { } - void add( int sock ) { - scoped_lock lk( _mutex ); - _sockets->insert( sock ); - } - void addPath( string path ) { - scoped_lock lk( _mutex ); - _socketPaths->insert( path ); - } - void remove( int sock ) { - scoped_lock lk( _mutex ); - _sockets->erase( sock ); - } - void closeAll() { - set<int>* sockets; - set<string>* paths; - - { - scoped_lock lk( _mutex ); - sockets = _sockets; - _sockets = new set<int>(); - paths = _socketPaths; - _socketPaths = new set<string>(); - } - - for ( set<int>::iterator i=sockets->begin(); i!=sockets->end(); i++ ) { - int sock = *i; - log() << "closing listening socket: " << sock << endl; - closesocket( sock ); - } - - for ( set<string>::iterator i=paths->begin(); i!=paths->end(); i++ ) { - string path = *i; - log() << "removing socket file: " << path << endl; - ::remove( path.c_str() ); - } - } - static ListeningSockets* get(); - private: - mongo::mutex _mutex; - set<int>* _sockets; - set<string>* _socketPaths; // for unix domain sockets - static ListeningSockets* _instance; - }; - -} // namespace mongo diff --git a/util/stringutils.h b/util/stringutils.h index 60571e6..93598aa 100644 --- a/util/stringutils.h +++ b/util/stringutils.h @@ -15,12 +15,12 @@ * limitations under the License. */ -#ifndef UTIL_STRING_UTILS_HEADER -#define UTIL_STRING_UTILS_HEADER +#pragma once namespace mongo { // see also mongoutils/str.h - perhaps move these there? + // see also text.h void splitStringDelim( const string& str , vector<string>* res , char delim ); @@ -40,6 +40,100 @@ namespace mongo { return string(copy); } -} // namespace mongo + /** + * Non numeric characters are compared lexicographically; numeric substrings + * are compared numerically; dots separate ordered comparable subunits. + * For convenience, character 255 is greater than anything else. + */ + inline int lexNumCmp( const char *s1, const char *s2 ) { + //cout << "START : " << s1 << "\t" << s2 << endl; + + bool startWord = true; + + while( *s1 && *s2 ) { + + bool d1 = ( *s1 == '.' ); + bool d2 = ( *s2 == '.' ); + if ( d1 && !d2 ) + return -1; + if ( d2 && !d1 ) + return 1; + if ( d1 && d2 ) { + ++s1; ++s2; + startWord = true; + continue; + } + + bool p1 = ( *s1 == (char)255 ); + bool p2 = ( *s2 == (char)255 ); + //cout << "\t\t " << p1 << "\t" << p2 << endl; + if ( p1 && !p2 ) + return 1; + if ( p2 && !p1 ) + return -1; + + bool n1 = isNumber( *s1 ); + bool n2 = isNumber( *s2 ); + + if ( n1 && n2 ) { + // get rid of leading 0s + if ( startWord ) { + while ( *s1 == '0' ) s1++; + while ( *s2 == '0' ) s2++; + } + + char * e1 = (char*)s1; + char * e2 = (char*)s2; + + // find length + // if end of string, will break immediately ('\0') + while ( isNumber (*e1) ) e1++; + while ( isNumber (*e2) ) e2++; + + int len1 = (int)(e1-s1); + int len2 = (int)(e2-s2); + + int result; + // if one is longer than the other, return + if ( len1 > len2 ) { + return 1; + } + else if ( len2 > len1 ) { + return -1; + } + // if the lengths are equal, just strcmp + else if ( (result = strncmp(s1, s2, len1)) != 0 ) { + return result; + } -#endif // UTIL_STRING_UTILS_HEADER + // otherwise, the numbers are equal + s1 = e1; + s2 = e2; + startWord = false; + continue; + } + + if ( n1 ) + return 1; + + if ( n2 ) + return -1; + + if ( *s1 > *s2 ) + return 1; + + if ( *s2 > *s1 ) + return -1; + + s1++; s2++; + startWord = false; + } + + if ( *s1 ) + return 1; + if ( *s2 ) + return -1; + return 0; + } + +} // namespace mongo diff --git a/util/time_support.h b/util/time_support.h index 5dedec9..ca17807 100644 --- a/util/time_support.h +++ b/util/time_support.h @@ -52,6 +52,16 @@ namespace mongo { return buf; } + inline string timeToISOString(time_t time) { + struct tm t; + time_t_to_Struct( time, &t ); + + const char* fmt = "%Y-%m-%dT%H:%M:%SZ"; + char buf[32]; + assert(strftime(buf, sizeof(buf), fmt, &t) == 20); + return buf; + } + inline boost::gregorian::date currentDate() { boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); return now.date(); @@ -161,35 +171,52 @@ namespace mongo { } #endif - // note this wraps - inline int tdiff(unsigned told, unsigned tnew) { - return WrappingInt::diff(tnew, told); + extern long long jsTime_virtual_skew; + extern boost::thread_specific_ptr<long long> jsTime_virtual_thread_skew; + + // DO NOT TOUCH except for testing + inline void jsTimeVirtualSkew( long long skew ){ + jsTime_virtual_skew = skew; + } + inline long long getJSTimeVirtualSkew(){ + return jsTime_virtual_skew; } - /** curTimeMillis will overflow - use curTimeMicros64 instead if you care about that. */ - inline unsigned curTimeMillis() { - boost::xtime xt; - boost::xtime_get(&xt, boost::TIME_UTC); - unsigned t = xt.nsec / 1000000; - return (xt.sec & 0xfffff) * 1000 + t; + inline void jsTimeVirtualThreadSkew( long long skew ){ + jsTime_virtual_thread_skew.reset(new long long(skew)); + } + inline long long getJSTimeVirtualThreadSkew(){ + if(jsTime_virtual_thread_skew.get()){ + return *(jsTime_virtual_thread_skew.get()); + } + else return 0; } /** Date_t is milliseconds since epoch */ + inline Date_t jsTime(); + + /** warning this will wrap */ + inline unsigned curTimeMicros(); + + inline unsigned long long curTimeMicros64(); +#ifdef _WIN32 // no gettimeofday on windows + inline unsigned long long curTimeMillis64() { + boost::xtime xt; + boost::xtime_get(&xt, boost::TIME_UTC); + return ((unsigned long long)xt.sec) * 1000 + xt.nsec / 1000000; + } inline Date_t jsTime() { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); unsigned long long t = xt.nsec / 1000000; - return ((unsigned long long) xt.sec * 1000) + t; + return ((unsigned long long) xt.sec * 1000) + t + getJSTimeVirtualSkew() + getJSTimeVirtualThreadSkew(); } - inline unsigned long long curTimeMicros64() { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); unsigned long long t = xt.nsec / 1000; return (((unsigned long long) xt.sec) * 1000000) + t; - } - - // measures up to 1024 seconds. or, 512 seconds with tdiff that is... + } inline unsigned curTimeMicros() { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); @@ -197,5 +224,30 @@ namespace mongo { unsigned secs = xt.sec % 1024; return secs*1000000 + t; } +#else +# include <sys/time.h> + inline unsigned long long curTimeMillis64() { + timeval tv; + gettimeofday(&tv, NULL); + return ((unsigned long long)tv.tv_sec) * 1000 + tv.tv_usec / 1000; + } + inline Date_t jsTime() { + timeval tv; + gettimeofday(&tv, NULL); + unsigned long long t = tv.tv_usec / 1000; + return ((unsigned long long) tv.tv_sec * 1000) + t + getJSTimeVirtualSkew() + getJSTimeVirtualThreadSkew(); + } + inline unsigned long long curTimeMicros64() { + timeval tv; + gettimeofday(&tv, NULL); + return (((unsigned long long) tv.tv_sec) * 1000*1000) + tv.tv_usec; + } + inline unsigned curTimeMicros() { + timeval tv; + gettimeofday(&tv, NULL); + unsigned secs = tv.tv_sec % 1024; + return secs*1000*1000 + tv.tv_usec; + } +#endif } // namespace mongo diff --git a/util/timer.h b/util/timer.h index f5a21f8..cbfe859 100644 --- a/util/timer.h +++ b/util/timer.h @@ -24,44 +24,81 @@ namespace mongo { /** * simple scoped timer */ - class Timer { + class Timer /*copyable*/ { public: - Timer() { - reset(); - } - - Timer( unsigned long long start ) { - old = start; - } - - int seconds() const { - return (int)(micros() / 1000000); - } + Timer() { reset(); } + Timer( unsigned long long startMicros ) { old = startMicros; } + int seconds() const { return (int)(micros() / 1000000); } + int millis() const { return (int)(micros() / 1000); } + int minutes() const { return seconds() / 60; } + - int millis() const { - return (long)(micros() / 1000); + /** gets time interval and resets at the same time. this way we can call curTimeMicros + once instead of twice if one wanted millis() and then reset(). + @return time in millis + */ + int millisReset() { + unsigned long long now = curTimeMicros64(); + int m = (int)((now-old)/1000); + old = now; + return m; } + // note: dubious that the resolution is as anywhere near as high as ethod name implies! unsigned long long micros() const { unsigned long long n = curTimeMicros64(); return n - old; } - unsigned long long micros(unsigned long long & n) const { // returns cur time in addition to timer result n = curTimeMicros64(); return n - old; } - unsigned long long startTime() { - return old; - } - - void reset() { - old = curTimeMicros64(); - } - + unsigned long long startTime() const { return old; } + void reset() { old = curTimeMicros64(); } private: unsigned long long old; }; +#if 1 + class DevTimer { + public: + class scoped { + public: + scoped(DevTimer& dt) { } + ~scoped() { } + }; + DevTimer(string) { } + ~DevTimer() { } + }; +#elif defined(_WIN32) + class DevTimer { + const string _name; + public: + unsigned long long _ticks; + class scoped { + DevTimer& _dt; + unsigned long long _start; + public: + scoped(DevTimer& dt) : _dt(dt) { + LARGE_INTEGER i; + QueryPerformanceCounter(&i); + _start = i.QuadPart; + } + ~scoped() { + LARGE_INTEGER i; + QueryPerformanceCounter(&i); + _dt._ticks += (i.QuadPart - _start); + } + }; + DevTimer(string name) : _name(name), _ticks(0) { + } + ~DevTimer() { + LARGE_INTEGER freq; + assert( QueryPerformanceFrequency(&freq) );
+ cout << "devtimer\t" << _name << '\t' << _ticks*1000.0/freq.QuadPart << "ms" << endl; + } + }; +#endif + } // namespace mongo diff --git a/util/util.cpp b/util/util.cpp index 216683a..4528e30 100644 --- a/util/util.cpp +++ b/util/util.cpp @@ -21,6 +21,7 @@ #include "file_allocator.h" #include "optime.h" #include "time_support.h" +#include "mongoutils/str.h" namespace mongo { @@ -46,6 +47,13 @@ namespace mongo { static unsigned N = 0; if ( strcmp( name , "conn" ) == 0 ) { + string* x = _threadName.get(); + if ( x && mongoutils::str::startsWith( *x , "conn" ) ) { + int n = atoi( x->c_str() + 4 ); + if ( n > 0 ) + return n; + warning() << "unexpected thread name [" << *x << "] parsed to " << n << endl; + } unsigned n = ++N; stringstream ss; ss << name << n; @@ -142,17 +150,6 @@ namespace mongo { struct UtilTest : public UnitTest { void run() { - assert( WrappingInt(0) <= WrappingInt(0) ); - assert( WrappingInt(0) <= WrappingInt(1) ); - assert( !(WrappingInt(1) <= WrappingInt(0)) ); - assert( (WrappingInt(0xf0000000) <= WrappingInt(0)) ); - assert( (WrappingInt(0xf0000000) <= WrappingInt(9000)) ); - assert( !(WrappingInt(300) <= WrappingInt(0xe0000000)) ); - - assert( tdiff(3, 4) == 1 ); - assert( tdiff(4, 3) == -1 ); - assert( tdiff(0xffffffff, 0) == 1 ); - assert( isPrime(3) ); assert( isPrime(2) ); assert( isPrime(13) ); @@ -184,7 +181,8 @@ namespace mongo { /* note: can't use malloc herein - may be in signal handler. logLockless() likely does not comply and should still be fixed todo - */ + likewise class string? + */ void rawOut( const string &s ) { if( s.empty() ) return; diff --git a/util/version.cpp b/util/version.cpp index 4045cb5..a6efbd5 100644 --- a/util/version.cpp +++ b/util/version.cpp @@ -23,11 +23,63 @@ #include <string> #include "unittest.h" #include "version.h" +#include "stringutils.h" +#include "../db/jsobj.h" #include "file.h" +#include "ramlog.h" +#include "../db/cmdline.h" namespace mongo { - const char versionString[] = "1.8.3"; + /* Approved formats for versionString: + * 1.2.3 + * 1.2.3-pre- + * 1.2.3-rc4 (up to rc9) + * 1.2.3-rc4-pre- + * If you really need to do something else you'll need to fix _versionArray() + */ + const char versionString[] = "2.0.0"; + + // See unit test for example outputs + static BSONArray _versionArray(const char* version){ + // this is inefficient, but cached so it doesn't matter + BSONArrayBuilder b; + string curPart; + const char* c = version; + int finalPart = 0; // 0 = final release, -100 = pre, -10 to -1 = -10 + X for rcX + do { //walks versionString including NUL byte + if (!(*c == '.' || *c == '-' || *c == '\0')){ + curPart += *c; + continue; + } + + try { + unsigned num = stringToNum(curPart.c_str()); + b.append((int) num); + } + catch (...){ // not a number + if (curPart.empty()){ + assert(*c == '\0'); + break; + } + else if (startsWith(curPart, "rc")){ + finalPart = -10 + stringToNum(curPart.c_str()+2); + break; + } + else if (curPart == "pre"){ + finalPart = -100; + break; + } + } + + curPart = ""; + } while (*c++); + + b.append(finalPart); + return b.arr(); + } + + const BSONArray versionArray = _versionArray(versionString); string mongodVersion() { stringstream ss; @@ -61,38 +113,42 @@ namespace mongo { #endif void printSysInfo() { - log() << "build sys info: " << sysInfo() << endl; + log() << "build info: " << sysInfo() << endl; } + + static Tee * startupWarningsLog = new RamLog("startupWarnings"); //intentionally leaked + // - // 32 bit systems warning + // system warnings // void show_warnings() { - // each message adds a leading but not a trailing newline + // each message adds a leading and a trailing newline bool warned = false; { const char * foo = strchr( versionString , '.' ) + 1; int bar = atoi( foo ); if ( ( 2 * ( bar / 2 ) ) != bar ) { - cout << "\n** NOTE: This is a development version (" << versionString << ") of MongoDB."; - cout << "\n** Not recommended for production." << endl; + log() << startupWarningsLog; + log() << "** NOTE: This is a development version (" << versionString << ") of MongoDB." << startupWarningsLog; + log() << "** Not recommended for production." << startupWarningsLog; warned = true; } } if ( sizeof(int*) == 4 ) { - cout << endl; - cout << "** NOTE: when using MongoDB 32 bit, you are limited to about 2 gigabytes of data" << endl; - cout << "** see http://blog.mongodb.org/post/137788967/32-bit-limitations" << endl; - cout << "** with --dur, the limit is lower" << endl; + log() << startupWarningsLog; + log() << "** NOTE: when using MongoDB 32 bit, you are limited to about 2 gigabytes of data" << startupWarningsLog; + log() << "** see http://blog.mongodb.org/post/137788967/32-bit-limitations" << startupWarningsLog; + log() << "** with --journal, the limit is lower" << startupWarningsLog; warned = true; } #ifdef __linux__ if (boost::filesystem::exists("/proc/vz") && !boost::filesystem::exists("/proc/bc")) { - cout << endl; - cout << "** WARNING: You are running in OpenVZ. This is known to be broken!!!" << endl; + log() << startupWarningsLog; + log() << "** WARNING: You are running in OpenVZ. This is known to be broken!!!" << startupWarningsLog; warned = true; } @@ -122,22 +178,49 @@ namespace mongo { const char* space = strchr(line, ' '); if ( ! space ) { - cout << "** WARNING: cannot parse numa_maps" << endl; + log() << startupWarningsLog; + log() << "** WARNING: cannot parse numa_maps" << startupWarningsLog; warned = true; } else if ( ! startsWith(space+1, "interleave") ) { - cout << endl; - cout << "** WARNING: You are running on a NUMA machine." << endl; - cout << "** We suggest launching mongod like this to avoid performance problems:" << endl; - cout << "** numactl --interleave=all mongod [other options]" << endl; + log() << startupWarningsLog; + log() << "** WARNING: You are running on a NUMA machine." << startupWarningsLog; + log() << "** We suggest launching mongod like this to avoid performance problems:" << startupWarningsLog; + log() << "** numactl --interleave=all mongod [other options]" << startupWarningsLog; warned = true; } } } + + if (cmdLine.dur){ + fstream f ("/proc/sys/vm/overcommit_memory", ios_base::in); + unsigned val; + f >> val; + + if (val == 2) { + log() << startupWarningsLog; + log() << "** WARNING: /proc/sys/vm/overcommit_memory is " << val << startupWarningsLog; + log() << "** Journaling works best with it set to 0 or 1" << startupWarningsLog; + } + } + + if (boost::filesystem::exists("/proc/sys/vm/zone_reclaim_mode")){ + fstream f ("/proc/sys/vm/zone_reclaim_mode", ios_base::in); + unsigned val; + f >> val; + + if (val != 0) { + log() << startupWarningsLog; + log() << "** WARNING: /proc/sys/vm/zone_reclaim_mode is " << val << startupWarningsLog; + log() << "** We suggest setting it to 0" << startupWarningsLog; + log() << "** http://www.kernel.org/doc/Documentation/sysctl/vm.txt" << startupWarningsLog; + } + } #endif - if (warned) - cout << endl; + if (warned) { + log() << startupWarningsLog; + } } int versionCmp(StringData rhs, StringData lhs) { @@ -174,4 +257,28 @@ namespace mongo { log(1) << "versionCmpTest passed" << endl; } } versionCmpTest; + + class VersionArrayTest : public UnitTest { + public: + void run() { + assert( _versionArray("1.2.3") == BSON_ARRAY(1 << 2 << 3 << 0) ); + assert( _versionArray("1.2.0") == BSON_ARRAY(1 << 2 << 0 << 0) ); + assert( _versionArray("2.0.0") == BSON_ARRAY(2 << 0 << 0 << 0) ); + + assert( _versionArray("1.2.3-pre-") == BSON_ARRAY(1 << 2 << 3 << -100) ); + assert( _versionArray("1.2.0-pre-") == BSON_ARRAY(1 << 2 << 0 << -100) ); + assert( _versionArray("2.0.0-pre-") == BSON_ARRAY(2 << 0 << 0 << -100) ); + + assert( _versionArray("1.2.3-rc0") == BSON_ARRAY(1 << 2 << 3 << -10) ); + assert( _versionArray("1.2.0-rc1") == BSON_ARRAY(1 << 2 << 0 << -9) ); + assert( _versionArray("2.0.0-rc2") == BSON_ARRAY(2 << 0 << 0 << -8) ); + + // Note that the pre of an rc is the same as the rc itself + assert( _versionArray("1.2.3-rc3-pre-") == BSON_ARRAY(1 << 2 << 3 << -7) ); + assert( _versionArray("1.2.0-rc4-pre-") == BSON_ARRAY(1 << 2 << 0 << -6) ); + assert( _versionArray("2.0.0-rc5-pre-") == BSON_ARRAY(2 << 0 << 0 << -5) ); + + log(1) << "versionArrayTest passed" << endl; + } + } versionArrayTest; } diff --git a/util/version.h b/util/version.h index 779fbdc..64f8b14 100644 --- a/util/version.h +++ b/util/version.h @@ -4,11 +4,13 @@ #include <string> namespace mongo { + struct BSONArray; using std::string; // mongo version extern const char versionString[]; + extern const BSONArray versionArray; string mongodVersion(); int versionCmp(StringData rhs, StringData lhs); // like strcmp |