summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
Diffstat (limited to 'util')
-rw-r--r--util/alignedbuilder.cpp41
-rw-r--r--util/alignedbuilder.h16
-rw-r--r--util/array.h7
-rw-r--r--util/assert_util.cpp71
-rw-r--r--util/assert_util.h76
-rw-r--r--util/background.cpp70
-rw-r--r--util/background.h49
-rw-r--r--util/bson_util.h42
-rw-r--r--util/bufreader.h2
-rw-r--r--util/checksum.h37
-rw-r--r--util/compress.cpp31
-rw-r--r--util/compress.h21
-rw-r--r--util/concurrency/list.h28
-rw-r--r--util/concurrency/mutex.h103
-rw-r--r--util/concurrency/race.h49
-rw-r--r--util/concurrency/rwlock.h250
-rw-r--r--[-rwxr-xr-x]util/concurrency/shared_mutex_win.hpp29
-rw-r--r--util/concurrency/spin_lock.cpp45
-rw-r--r--util/concurrency/spin_lock.h33
-rw-r--r--util/concurrency/synchronization.cpp36
-rw-r--r--util/concurrency/synchronization.h18
-rw-r--r--util/concurrency/value.h51
-rw-r--r--util/concurrency/vars.cpp4
-rw-r--r--util/file.h76
-rw-r--r--util/file_allocator.cpp73
-rw-r--r--util/file_allocator.h6
-rw-r--r--util/goodies.h154
-rw-r--r--util/hashtab.h14
-rw-r--r--util/log.cpp38
-rw-r--r--util/log.h53
-rw-r--r--util/logfile.cpp87
-rw-r--r--util/logfile.h3
-rw-r--r--util/message.cpp764
-rw-r--r--util/mmap.cpp30
-rw-r--r--util/mmap.h38
-rw-r--r--util/mmap_posix.cpp24
-rw-r--r--util/mmap_win.cpp13
-rwxr-xr-xutil/mongoutils/README2
-rw-r--r--util/mongoutils/str.h8
-rw-r--r--util/mongoutils/test.cpp2
-rw-r--r--util/net/hostandport.h (renamed from util/hostandport.h)40
-rw-r--r--util/net/httpclient.cpp (renamed from util/httpclient.cpp)45
-rw-r--r--util/net/httpclient.h (renamed from util/httpclient.h)16
-rw-r--r--util/net/listen.cpp391
-rw-r--r--util/net/listen.h190
-rw-r--r--util/net/message.cpp64
-rw-r--r--util/net/message.h (renamed from util/message.h)210
-rw-r--r--util/net/message_port.cpp298
-rw-r--r--util/net/message_port.h107
-rw-r--r--util/net/message_server.h (renamed from util/message_server.h)16
-rw-r--r--util/net/message_server_asio.cpp (renamed from util/message_server_asio.cpp)0
-rw-r--r--util/net/message_server_port.cpp (renamed from util/message_server_port.cpp)66
-rw-r--r--util/net/miniwebserver.cpp (renamed from util/miniwebserver.cpp)23
-rw-r--r--util/net/miniwebserver.h (renamed from util/miniwebserver.h)10
-rw-r--r--util/net/sock.cpp713
-rw-r--r--util/net/sock.h256
-rw-r--r--util/optime.h36
-rw-r--r--util/paths.h42
-rw-r--r--util/processinfo.h6
-rw-r--r--util/processinfo_darwin.cpp5
-rw-r--r--util/processinfo_win32.cpp3
-rw-r--r--util/queue.h6
-rw-r--r--util/ramlog.cpp190
-rw-r--r--util/ramlog.h144
-rw-r--r--util/sock.cpp235
-rw-r--r--util/sock.h303
-rw-r--r--util/stringutils.h102
-rw-r--r--util/time_support.h80
-rw-r--r--util/timer.h83
-rw-r--r--util/util.cpp22
-rw-r--r--util/version.cpp145
-rw-r--r--util/version.h2
72 files changed, 4114 insertions, 2229 deletions
diff --git a/util/alignedbuilder.cpp b/util/alignedbuilder.cpp
index 1734431..b2e0461 100644
--- a/util/alignedbuilder.cpp
+++ b/util/alignedbuilder.cpp
@@ -29,6 +29,35 @@ namespace mongo {
BOOST_STATIC_ASSERT(sizeof(void*) == sizeof(size_t));
+ /** reset for a re-use. shrinks if > 128MB */
+ void AlignedBuilder::reset() {
+ _len = 0;
+ RARELY {
+ const unsigned sizeCap = 128*1024*1024;
+ if (_p._size > sizeCap)
+ _realloc(sizeCap, _len);
+ }
+ }
+
+ /** reset with a hint as to the upcoming needed size specified */
+ void AlignedBuilder::reset(unsigned sz) {
+ _len = 0;
+ unsigned Q = 32 * 1024 * 1024 - 1;
+ unsigned want = (sz+Q) & (~Q);
+ if( _p._size == want ) {
+ return;
+ }
+ if( _p._size > want ) {
+ if( _p._size <= 64 * 1024 * 1024 )
+ return;
+ bool downsize = false;
+ RARELY { downsize = true; }
+ if( !downsize )
+ return;
+ }
+ _realloc(want, _len);
+ }
+
void AlignedBuilder::mallocSelfAligned(unsigned sz) {
assert( sz == _p._size );
void *p = malloc(sz + Alignment - 1);
@@ -44,10 +73,20 @@ namespace mongo {
/* "slow"/infrequent portion of 'grow()' */
void NOINLINE_DECL AlignedBuilder::growReallocate(unsigned oldLen) {
+ dassert( _len > _p._size );
unsigned a = _p._size;
assert( a );
while( 1 ) {
- a *= 2;
+ if( a < 128 * 1024 * 1024 )
+ a *= 2;
+ else if( sizeof(int*) == 4 )
+ a += 32 * 1024 * 1024;
+ else
+ a += 64 * 1024 * 1024;
+ DEV if( a > 256*1024*1024 ) {
+ log() << "dur AlignedBuilder too big, aborting in _DEBUG build" << endl;
+ abort();
+ }
wassert( a <= 256*1024*1024 );
assert( a <= 512*1024*1024 );
if( _len < a )
diff --git a/util/alignedbuilder.h b/util/alignedbuilder.h
index 452cec2..1d246a9 100644
--- a/util/alignedbuilder.h
+++ b/util/alignedbuilder.h
@@ -28,13 +28,11 @@ namespace mongo {
AlignedBuilder(unsigned init_size);
~AlignedBuilder() { kill(); }
+ /** reset with a hint as to the upcoming needed size specified */
+ void reset(unsigned sz);
+
/** reset for a re-use. shrinks if > 128MB */
- void reset() {
- _len = 0;
- const unsigned sizeCap = 128*1024*1024;
- if (_p._size > sizeCap)
- _realloc(sizeCap, _len);
- }
+ void reset();
/** note this may be deallocated (realloced) if you keep writing or reset(). */
const char* buf() const { return _p._data; }
@@ -48,8 +46,12 @@ namespace mongo {
return l;
}
+ /** if buffer grows pointer no longer valid */
char* atOfs(unsigned ofs) { return _p._data + ofs; }
+ /** if buffer grows pointer no longer valid */
+ char* cur() { return _p._data + _len; }
+
void appendChar(char j) {
*((char*)grow(sizeof(char))) = j;
}
@@ -99,7 +101,7 @@ namespace mongo {
inline char* grow(unsigned by) {
unsigned oldlen = _len;
_len += by;
- if ( _len > _p._size ) {
+ if (MONGO_unlikely( _len > _p._size )) {
growReallocate(oldlen);
}
return _p._data + oldlen;
diff --git a/util/array.h b/util/array.h
index bf705a4..1282225 100644
--- a/util/array.h
+++ b/util/array.h
@@ -18,6 +18,12 @@
namespace mongo {
+ /*
+ * simple array class that does no allocations
+ * same api as vector
+ * fixed buffer, so once capacity is exceeded, will assert
+ * meant to be-reused with clear()
+ */
template<typename T>
class FastArray {
public:
@@ -44,6 +50,7 @@ namespace mongo {
}
void push_back( const T& t ) {
+ assert( _size < _capacity );
_data[_size++] = t;
}
diff --git a/util/assert_util.cpp b/util/assert_util.cpp
index 8280d8b..da039c0 100644
--- a/util/assert_util.cpp
+++ b/util/assert_util.cpp
@@ -62,18 +62,34 @@ namespace mongo {
b.append( c , code );
}
-
string getDbContext();
/* "warning" assert -- safe to continue, so we don't throw exception. */
- void wasserted(const char *msg, const char *file, unsigned line) {
- problem() << "Assertion failure " << msg << ' ' << file << ' ' << dec << line << endl;
+ NOINLINE_DECL void wasserted(const char *msg, const char *file, unsigned line) {
+ static bool rateLimited;
+ static time_t lastWhen;
+ static unsigned lastLine;
+ if( lastLine == line && time(0)-lastWhen < 5 ) {
+ if( rateLimited++ == 0 ) {
+ log() << "rate limiting wassert" << endl;
+ }
+ return;
+ }
+ lastWhen = time(0);
+ lastLine = line;
+
+ problem() << "warning assertion failure " << msg << ' ' << file << ' ' << dec << line << endl;
sayDbContext();
raiseError(0,msg && *msg ? msg : "wassertion failure");
assertionCount.condrollover( ++assertionCount.warning );
+#if defined(_DEBUG) || defined(_DURABLEDEFAULTON) || defined(_DURABLEDEFAULTOFF)
+ // this is so we notice in buildbot
+ log() << "\n\n***aborting after wassert() failure in a debug/test build\n\n" << endl;
+ abort();
+#endif
}
- void asserted(const char *msg, const char *file, unsigned line) {
+ NOINLINE_DECL void asserted(const char *msg, const char *file, unsigned line) {
assertionCount.condrollover( ++assertionCount.regular );
problem() << "Assertion failure " << msg << ' ' << file << ' ' << dec << line << endl;
sayDbContext();
@@ -82,6 +98,28 @@ namespace mongo {
temp << "assertion " << file << ":" << line;
AssertionException e(temp.str(),0);
breakpoint();
+#if defined(_DEBUG) || defined(_DURABLEDEFAULTON) || defined(_DURABLEDEFAULTOFF)
+ // this is so we notice in buildbot
+ log() << "\n\n***aborting after assert() failure in a debug/test build\n\n" << endl;
+ abort();
+#endif
+ throw e;
+ }
+
+ NOINLINE_DECL void verifyFailed( int msgid ) {
+ assertionCount.condrollover( ++assertionCount.regular );
+ problem() << "Assertion failure " << msgid << endl;
+ sayDbContext();
+ raiseError(0,"assertion failure");
+ stringstream temp;
+ temp << msgid;
+ AssertionException e(temp.str(),0);
+ breakpoint();
+#if defined(_DEBUG) || defined(_DURABLEDEFAULTON) || defined(_DURABLEDEFAULTOFF)
+ // this is so we notice in buildbot
+ log() << "\n\n***aborting after verify() failure in a debug/test build\n\n" << endl;
+ abort();
+#endif
throw e;
}
@@ -89,14 +127,14 @@ namespace mongo {
raiseError(0,msg);
}
- void uasserted(int msgid, const char *msg) {
+ NOINLINE_DECL void uasserted(int msgid, const char *msg) {
assertionCount.condrollover( ++assertionCount.user );
LOG(1) << "User Assertion: " << msgid << ":" << msg << endl;
raiseError(msgid,msg);
throw UserException(msgid, msg);
}
- void msgasserted(int msgid, const char *msg) {
+ NOINLINE_DECL void msgasserted(int msgid, const char *msg) {
assertionCount.condrollover( ++assertionCount.warning );
tlog() << "Assertion: " << msgid << ":" << msg << endl;
raiseError(msgid,msg && *msg ? msg : "massert failure");
@@ -105,14 +143,14 @@ namespace mongo {
throw MsgAssertionException(msgid, msg);
}
- void msgassertedNoTrace(int msgid, const char *msg) {
+ NOINLINE_DECL void msgassertedNoTrace(int msgid, const char *msg) {
assertionCount.condrollover( ++assertionCount.warning );
log() << "Assertion: " << msgid << ":" << msg << endl;
raiseError(msgid,msg && *msg ? msg : "massert failure");
throw MsgAssertionException(msgid, msg);
}
- void streamNotGood( int code , string msg , std::ios& myios ) {
+ NOINLINE_DECL void streamNotGood( int code , string msg , std::ios& myios ) {
stringstream ss;
// errno might not work on all systems for streams
// if it doesn't for a system should deal with here
@@ -144,5 +182,22 @@ namespace mongo {
#endif
}
+ NOINLINE_DECL ErrorMsg::ErrorMsg(const char *msg, char ch) {
+ int l = strlen(msg);
+ assert( l < 128);
+ memcpy(buf, msg, l);
+ char *p = buf + l;
+ p[0] = ch;
+ p[1] = 0;
+ }
+
+ NOINLINE_DECL ErrorMsg::ErrorMsg(const char *msg, unsigned val) {
+ int l = strlen(msg);
+ assert( l < 128);
+ memcpy(buf, msg, l);
+ char *p = buf + l;
+ sprintf(p, "%u", val);
+ }
+
}
diff --git a/util/assert_util.h b/util/assert_util.h
index 151e950..b4c68b7 100644
--- a/util/assert_util.h
+++ b/util/assert_util.h
@@ -20,6 +20,13 @@
#include "../db/lasterror.h"
+// MONGO_NORETURN undefed at end of file
+#ifdef __GNUC__
+# define MONGO_NORETURN __attribute__((__noreturn__))
+#else
+# define MONGO_NORETURN
+#endif
+
namespace mongo {
enum CommonErrorCodes {
@@ -53,11 +60,28 @@ namespace mongo {
void append( BSONObjBuilder& b , const char * m = "$err" , const char * c = "code" ) const ;
string toString() const { stringstream ss; ss << "exception: " << code << " " << msg; return ss.str(); }
bool empty() const { return msg.empty(); }
+
+ void reset(){ msg = ""; code=-1; }
string msg;
int code;
};
+ /** helper class that builds error strings. lighter weight than a StringBuilder, albeit less flexible.
+ NOINLINE_DECL used in the constructor implementations as we are assuming this is a cold code path when used.
+
+ example:
+ throw UserException(123, ErrorMsg("blah", num_val));
+ */
+ class ErrorMsg {
+ public:
+ ErrorMsg(const char *msg, char ch);
+ ErrorMsg(const char *msg, unsigned val);
+ operator string() const { return buf; }
+ private:
+ char buf[256];
+ };
+
class DBException : public std::exception {
public:
DBException( const ExceptionInfo& ei ) : _ei(ei) {}
@@ -117,14 +141,14 @@ namespace mongo {
virtual void appendPrefix( stringstream& ss ) const { ss << "massert:"; }
};
-
- void asserted(const char *msg, const char *file, unsigned line);
+ void asserted(const char *msg, const char *file, unsigned line) MONGO_NORETURN;
void wasserted(const char *msg, const char *file, unsigned line);
-
+ void verifyFailed( int msgid );
+
/** a "user assertion". throws UserAssertion. logs. typically used for errors that a user
- could cause, such as dupliate key, disk full, etc.
+ could cause, such as duplicate key, disk full, etc.
*/
- void uasserted(int msgid, const char *msg);
+ void uasserted(int msgid, const char *msg) MONGO_NORETURN;
inline void uasserted(int msgid , string msg) { uasserted(msgid, msg.c_str()); }
/** reported via lasterror, but don't throw exception */
@@ -133,24 +157,33 @@ namespace mongo {
/** msgassert and massert are for errors that are internal but have a well defined error text string.
a stack trace is logged.
*/
- void msgassertedNoTrace(int msgid, const char *msg);
+ void msgassertedNoTrace(int msgid, const char *msg) MONGO_NORETURN;
inline void msgassertedNoTrace(int msgid, const string& msg) { msgassertedNoTrace( msgid , msg.c_str() ); }
- void msgasserted(int msgid, const char *msg);
+ void msgasserted(int msgid, const char *msg) MONGO_NORETURN;
inline void msgasserted(int msgid, string msg) { msgasserted(msgid, msg.c_str()); }
+ /* convert various types of exceptions to strings */
+ inline string causedBy( const char* e ){ return (string)" :: caused by :: " + e; }
+ inline string causedBy( const DBException& e ){ return causedBy( e.toString().c_str() ); }
+ inline string causedBy( const std::exception& e ){ return causedBy( e.what() ); }
+ inline string causedBy( const string& e ){ return causedBy( e.c_str() ); }
+
+ /** in the mongodb source, use verify() instead of assert(). verify is always evaluated even in release builds. */
+ inline void verify( int msgid , bool testOK ) { if ( ! testOK ) verifyFailed( msgid ); }
+
#ifdef assert
#undef assert
#endif
-#define MONGO_assert(_Expression) (void)( (!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) )
+#define MONGO_assert(_Expression) (void)( MONGO_likely(!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) )
#define assert MONGO_assert
/* "user assert". if asserts, user did something wrong, not our code */
-#define MONGO_uassert(msgid, msg, expr) (void)( (!!(expr)) || (mongo::uasserted(msgid, msg), 0) )
+#define MONGO_uassert(msgid, msg, expr) (void)( MONGO_likely(!!(expr)) || (mongo::uasserted(msgid, msg), 0) )
#define uassert MONGO_uassert
/* warning only - keeps going */
-#define MONGO_wassert(_Expression) (void)( (!!(_Expression)) || (mongo::wasserted(#_Expression, __FILE__, __LINE__), 0) )
+#define MONGO_wassert(_Expression) (void)( MONGO_likely(!!(_Expression)) || (mongo::wasserted(#_Expression, __FILE__, __LINE__), 0) )
#define wassert MONGO_wassert
/* display a message, no context, and throw assertionexception
@@ -158,7 +191,7 @@ namespace mongo {
easy way to throw an exception and log something without our stack trace
display happening.
*/
-#define MONGO_massert(msgid, msg, expr) (void)( (!!(expr)) || (mongo::msgasserted(msgid, msg), 0) )
+#define MONGO_massert(msgid, msg, expr) (void)( MONGO_likely(!!(expr)) || (mongo::msgasserted(msgid, msg), 0) )
#define massert MONGO_massert
/* dassert is 'debug assert' -- might want to turn off for production as these
@@ -179,7 +212,7 @@ namespace mongo {
enum { ASSERT_ID_DUPKEY = 11000 };
/* throws a uassertion with an appropriate msg */
- void streamNotGood( int code , string msg , std::ios& myios );
+ void streamNotGood( int code , string msg , std::ios& myios ) MONGO_NORETURN;
inline void assertStreamGood(unsigned msgid, string msg, std::ios& myios) {
if( !myios.good() ) streamNotGood(msgid, msg, myios);
@@ -195,10 +228,21 @@ namespace mongo {
expression; \
} catch ( const std::exception &e ) { \
stringstream ss; \
- ss << "caught boost exception: " << e.what(); \
- msgasserted( 13294 , ss.str() ); \
+ ss << "caught boost exception: " << e.what() << ' ' << __FILE__ << ' ' << __LINE__; \
+ msgasserted( 13294 , ss.str() ); \
+ } catch ( ... ) { \
+ massert( 10437 , "unknown boost failed" , false ); \
+ }
+
+#define MONGO_BOOST_CHECK_EXCEPTION_WITH_MSG( expression, msg ) \
+ try { \
+ expression; \
+ } catch ( const std::exception &e ) { \
+ stringstream ss; \
+ ss << msg << " caught boost exception: " << e.what(); \
+ msgasserted( 14043 , ss.str() ); \
} catch ( ... ) { \
- massert( 10437 , "unknown boost failed" , false ); \
+ msgasserted( 14044 , string("unknown boost failed ") + msg ); \
}
#define DESTRUCTOR_GUARD MONGO_DESTRUCTOR_GUARD
@@ -210,3 +254,5 @@ namespace mongo {
} catch ( ... ) { \
problem() << "caught unknown exception in destructor (" << __FUNCTION__ << ")" << endl; \
}
+
+#undef MONGO_NORETURN
diff --git a/util/background.cpp b/util/background.cpp
index 746d14c..215b271 100644
--- a/util/background.cpp
+++ b/util/background.cpp
@@ -18,8 +18,11 @@
#include "pch.h"
#include "concurrency/mutex.h"
+#include "concurrency/spin_lock.h"
#include "background.h"
+#include "time_support.h"
+#include "timer.h"
#include "mongoutils/str.h"
@@ -80,6 +83,7 @@ namespace mongo {
}
bool BackgroundJob::wait( unsigned msTimeOut ) {
+ assert( !_status->deleteSelf ); // you cannot call wait on a self-deleting job
scoped_lock l( _status->m );
while ( _status->state != Done ) {
if ( msTimeOut ) {
@@ -117,4 +121,70 @@ namespace mongo {
return _status->state == Running;
}
+ // -------------------------
+
+ PeriodicTask::PeriodicTask() {
+ if ( ! theRunner )
+ theRunner = new Runner();
+ theRunner->add( this );
+ }
+
+ PeriodicTask::~PeriodicTask() {
+ theRunner->remove( this );
+ }
+
+ void PeriodicTask::Runner::add( PeriodicTask* task ) {
+ scoped_spinlock lk( _lock );
+ _tasks.push_back( task );
+ }
+
+ void PeriodicTask::Runner::remove( PeriodicTask* task ) {
+ scoped_spinlock lk( _lock );
+ for ( size_t i=0; i<_tasks.size(); i++ ) {
+ if ( _tasks[i] == task ) {
+ _tasks[i] = 0;
+ break;
+ }
+ }
+ }
+
+ void PeriodicTask::Runner::run() {
+ int sleeptime = 60;
+ DEV sleeptime = 5; // to catch race conditions
+
+ while ( ! inShutdown() ) {
+
+ sleepsecs( sleeptime );
+
+ scoped_spinlock lk( _lock );
+
+ size_t size = _tasks.size();
+
+ for ( size_t i=0; i<size; i++ ) {
+ PeriodicTask * t = _tasks[i];
+ if ( ! t )
+ continue;
+
+ if ( inShutdown() )
+ break;
+
+ Timer timer;
+ try {
+ t->taskDoWork();
+ }
+ catch ( std::exception& e ) {
+ error() << "task: " << t->taskName() << " failed: " << e.what() << endl;
+ }
+ catch ( ... ) {
+ error() << "task: " << t->taskName() << " failed with unknown error" << endl;
+ }
+
+ int ms = timer.millis();
+ LOG( ms <= 3 ? 1 : 0 ) << "task: " << t->taskName() << " took: " << ms << "ms" << endl;
+ }
+ }
+ }
+
+ PeriodicTask::Runner* PeriodicTask::theRunner = 0;
+
} // namespace mongo
diff --git a/util/background.h b/util/background.h
index 861df9b..496a1f4 100644
--- a/util/background.h
+++ b/util/background.h
@@ -17,6 +17,8 @@
#pragma once
+#include "concurrency/spin_lock.h"
+
namespace mongo {
/**
@@ -102,5 +104,52 @@ namespace mongo {
void jobBody( boost::shared_ptr<JobStatus> status );
};
+
+ /**
+ * these run "roughly" every minute
+ * instantiate statically
+ * class MyTask : public PeriodicTask {
+ * public:
+ * virtual string name() const { return "MyTask; " }
+ * virtual void doWork() { log() << "hi" << endl; }
+ * } myTask;
+ */
+ class PeriodicTask {
+ public:
+ PeriodicTask();
+ virtual ~PeriodicTask();
+
+ virtual void taskDoWork() = 0;
+ virtual string taskName() const = 0;
+
+ class Runner : public BackgroundJob {
+ public:
+ virtual ~Runner(){}
+
+ virtual string name() const { return "PeriodicTask::Runner"; }
+
+ virtual void run();
+
+ void add( PeriodicTask* task );
+ void remove( PeriodicTask* task );
+
+ private:
+
+ SpinLock _lock;
+
+ // these are NOT owned by Runner
+ // Runner will not delete these
+ // this never gets smaller
+ // only fields replaced with nulls
+ vector<PeriodicTask*> _tasks;
+
+ };
+
+ static Runner* theRunner;
+
+ };
+
+
+
} // namespace mongo
diff --git a/util/bson_util.h b/util/bson_util.h
new file mode 100644
index 0000000..973e31f
--- /dev/null
+++ b/util/bson_util.h
@@ -0,0 +1,42 @@
+// bson_util.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../pch.h"
+
+namespace mongo {
+
+template <typename T>
+void bsonArrToNumVector(BSONElement el, vector<T>& results){
+
+ if(el.type() == Array){
+
+ vector<BSONElement> elements = el.Array();
+
+ for(vector<BSONElement>::iterator i = elements.begin(); i != elements.end(); ++i){
+ results.push_back( (T) (*i).Number() );
+ }
+ }
+ else if(el.isNumber()){
+ results.push_back( (T) el.Number() );
+ }
+
+}
+
+
+}
diff --git a/util/bufreader.h b/util/bufreader.h
index a0dcefa..53f0ba7 100644
--- a/util/bufreader.h
+++ b/util/bufreader.h
@@ -28,6 +28,7 @@ namespace mongo {
public:
class eof : public std::exception {
public:
+ eof() { }
virtual const char * what() { return "BufReader eof"; }
};
@@ -88,6 +89,7 @@ namespace mongo {
}
const void* pos() { return _pos; }
+ const void* start() { return _start; }
private:
const void *_start;
diff --git a/util/checksum.h b/util/checksum.h
new file mode 100644
index 0000000..009ab56
--- /dev/null
+++ b/util/checksum.h
@@ -0,0 +1,37 @@
+#pragma once
+#include "../pch.h"
+namespace mongo {
+ /** a simple, rather dumb, but very fast checksum. see perftests.cpp for unit tests. */
+ struct Checksum {
+ union {
+ unsigned char bytes[16];
+ unsigned long long words[2];
+ };
+
+ // if you change this you must bump dur::CurrentVersion
+ void gen(const void *buf, unsigned len) {
+ wassert( ((size_t)buf) % 8 == 0 ); // performance warning
+ unsigned n = len / 8 / 2;
+ const unsigned long long *p = (const unsigned long long *) buf;
+ unsigned long long a = 0;
+ for( unsigned i = 0; i < n; i++ ) {
+ a += (*p ^ i);
+ p++;
+ }
+ unsigned long long b = 0;
+ for( unsigned i = 0; i < n; i++ ) {
+ b += (*p ^ i);
+ p++;
+ }
+ unsigned long long c = 0;
+ for( unsigned i = n * 2 * 8; i < len; i++ ) { // 0-7 bytes left
+ c = (c << 8) | ((const char *)buf)[i];
+ }
+ words[0] = a ^ len;
+ words[1] = b ^ c;
+ }
+
+ bool operator==(const Checksum& rhs) const { return words[0]==rhs.words[0] && words[1]==rhs.words[1]; }
+ bool operator!=(const Checksum& rhs) const { return words[0]!=rhs.words[0] || words[1]!=rhs.words[1]; }
+ };
+}
diff --git a/util/compress.cpp b/util/compress.cpp
new file mode 100644
index 0000000..bcde488
--- /dev/null
+++ b/util/compress.cpp
@@ -0,0 +1,31 @@
+// @file compress.cpp
+
+#include "../third_party/snappy/snappy.h"
+#include "compress.h"
+#include <string>
+#include <string.h>
+#include <assert.h>
+
+namespace mongo {
+
+ void rawCompress(const char* input,
+ size_t input_length,
+ char* compressed,
+ size_t* compressed_length)
+ {
+ snappy::RawCompress(input, input_length, compressed, compressed_length);
+ }
+
+ size_t maxCompressedLength(size_t source_len) {
+ return snappy::MaxCompressedLength(source_len);
+ }
+
+ size_t compress(const char* input, size_t input_length, std::string* output) {
+ return snappy::Compress(input, input_length, output);
+ }
+
+ bool uncompress(const char* compressed, size_t compressed_length, std::string* uncompressed) {
+ return snappy::Uncompress(compressed, compressed_length, uncompressed);
+ }
+
+}
diff --git a/util/compress.h b/util/compress.h
new file mode 100644
index 0000000..5bc5a33
--- /dev/null
+++ b/util/compress.h
@@ -0,0 +1,21 @@
+// @file compress.h
+
+#pragma once
+
+#include <string>
+
+namespace mongo {
+
+ size_t compress(const char* input, size_t input_length, std::string* output);
+
+ bool uncompress(const char* compressed, size_t compressed_length, std::string* uncompressed);
+
+ size_t maxCompressedLength(size_t source_len);
+ void rawCompress(const char* input,
+ size_t input_length,
+ char* compressed,
+ size_t* compressed_length);
+
+}
+
+
diff --git a/util/concurrency/list.h b/util/concurrency/list.h
index e5eaec6..01bae6f 100644
--- a/util/concurrency/list.h
+++ b/util/concurrency/list.h
@@ -42,38 +42,54 @@ namespace mongo {
friend class List1;
T *_next;
public:
+ Base() : _next(0){}
+ ~Base() { wassert(false); } // we never want this to happen
T* next() const { return _next; }
};
- T* head() const { return _head; }
+ /** note this is safe:
+
+ T* p = mylist.head();
+ if( p )
+ use(p);
+
+ and this is not:
+
+ if( mylist.head() )
+ use( mylist.head() ); // could become 0
+ */
+ T* head() const { return (T*) _head; }
void push(T* t) {
+ assert( t->_next == 0 );
scoped_lock lk(_m);
- t->_next = _head;
+ t->_next = (T*) _head;
_head = t;
}
- // intentionally leak.
+ // intentionally leaks.
void orphanAll() {
+ scoped_lock lk(_m);
_head = 0;
}
/* t is not deleted, but is removed from the list. (orphaned) */
void orphan(T* t) {
scoped_lock lk(_m);
- T *&prev = _head;
+ T *&prev = (T*&) _head;
T *n = prev;
while( n != t ) {
+ uassert( 14050 , "List1: item to orphan not in list", n );
prev = n->_next;
n = prev;
}
prev = t->_next;
if( ++_orphans > 500 )
- log() << "warning orphans=" << _orphans << '\n';
+ log() << "warning List1 orphans=" << _orphans << '\n';
}
private:
- T *_head;
+ volatile T *_head;
mongo::mutex _m;
int _orphans;
};
diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h
index c463498..f17c3f0 100644
--- a/util/concurrency/mutex.h
+++ b/util/concurrency/mutex.h
@@ -19,11 +19,12 @@
#include <map>
#include <set>
-
#include "../heapcheck.h"
namespace mongo {
+ void printStackTrace( ostream &o );
+
class mutex;
inline boost::xtime incxtimemillis( long long s ) {
@@ -50,7 +51,6 @@ namespace mongo {
map< mid, set<mid> > followers;
boost::mutex &x;
unsigned magic;
-
void aBreakPoint() { } // for debugging
public:
// set these to create an assert that
@@ -147,20 +147,16 @@ namespace mongo {
~StaticObserver() { _destroyingStatics = true; }
};
- /** On pthread systems, it is an error to destroy a mutex while held. Static global
- * mutexes may be held upon shutdown in our implementation, and this way we avoid
- * destroying them.
- * NOT recursive.
+ /** On pthread systems, it is an error to destroy a mutex while held (boost mutex
+ * may use pthread). Static global mutexes may be held upon shutdown in our
+ * implementation, and this way we avoid destroying them.
+ * NOT recursive.
*/
class mutex : boost::noncopyable {
public:
#if defined(_DEBUG)
const char * const _name;
-#endif
-
-#if defined(_DEBUG)
- mutex(const char *name)
- : _name(name)
+ mutex(const char *name) : _name(name)
#else
mutex(const char *)
#endif
@@ -184,44 +180,47 @@ namespace mongo {
#else
ok( _l.locked() )
#endif
- {
- }
-
- ~try_lock() {
- }
-
+ { }
private:
boost::timed_mutex::scoped_timed_lock _l;
-
public:
const bool ok;
};
-
class scoped_lock : boost::noncopyable {
+ public:
#if defined(_DEBUG)
- mongo::mutex *mut;
+ struct PostStaticCheck {
+ PostStaticCheck() {
+ if ( StaticObserver::_destroyingStatics ) {
+ cout << "trying to lock a mongo::mutex during static shutdown" << endl;
+ printStackTrace( cout );
+ }
+ }
+ };
+
+ PostStaticCheck _check;
+ mongo::mutex * const _mut;
#endif
- public:
- scoped_lock( mongo::mutex &m ) : _l( m.boost() ) {
+ scoped_lock( mongo::mutex &m ) :
+#if defined(_DEBUG)
+ _mut(&m),
+#endif
+ _l( m.boost() ) {
#if defined(_DEBUG)
- mut = &m;
- mutexDebugger.entering(mut->_name);
+ mutexDebugger.entering(_mut->_name);
#endif
}
~scoped_lock() {
#if defined(_DEBUG)
- mutexDebugger.leaving(mut->_name);
+ mutexDebugger.leaving(_mut->_name);
#endif
}
boost::timed_mutex::scoped_lock &boost() { return _l; }
private:
boost::timed_mutex::scoped_lock _l;
};
-
-
private:
-
boost::timed_mutex &boost() { return *_m; }
boost::timed_mutex *_m;
};
@@ -229,4 +228,52 @@ namespace mongo {
typedef mutex::scoped_lock scoped_lock;
typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock;
+ /** The concept with SimpleMutex is that it is a basic lock/unlock with no
+ special functionality (such as try and try timeout). Thus it can be
+ implemented using OS-specific facilities in all environments (if desired).
+ On Windows, the implementation below is faster than boost mutex.
+ */
+#if defined(_WIN32)
+ class SimpleMutex : boost::noncopyable {
+ CRITICAL_SECTION _cs;
+ public:
+ SimpleMutex(const char *name) { InitializeCriticalSection(&_cs); }
+ ~SimpleMutex() { DeleteCriticalSection(&_cs); }
+
+ void lock() { EnterCriticalSection(&_cs); }
+ void unlock() { LeaveCriticalSection(&_cs); }
+
+ class scoped_lock : boost::noncopyable {
+ SimpleMutex& _m;
+ public:
+ scoped_lock( SimpleMutex &m ) : _m(m) { _m.lock(); }
+ ~scoped_lock() { _m.unlock(); }
+ };
+ };
+#else
+ class SimpleMutex : boost::noncopyable {
+ public:
+ SimpleMutex(const char* name) { assert( pthread_mutex_init(&_lock,0) == 0 ); }
+ ~SimpleMutex(){
+ if ( ! StaticObserver::_destroyingStatics ) {
+ assert( pthread_mutex_destroy(&_lock) == 0 );
+ }
+ }
+
+ void lock() { assert( pthread_mutex_lock(&_lock) == 0 ); }
+ void unlock() { assert( pthread_mutex_unlock(&_lock) == 0 ); }
+
+ class scoped_lock : boost::noncopyable {
+ SimpleMutex& _m;
+ public:
+ scoped_lock( SimpleMutex &m ) : _m(m) { _m.lock(); }
+ ~scoped_lock() { _m.unlock(); }
+ };
+
+ private:
+ pthread_mutex_t _lock;
+ };
+
+#endif
+
}
diff --git a/util/concurrency/race.h b/util/concurrency/race.h
index 0b8338c..4644e37 100644
--- a/util/concurrency/race.h
+++ b/util/concurrency/race.h
@@ -19,15 +19,56 @@ namespace mongo {
the same time. Also detects and disallows recursion.
*/
+#ifdef _WIN32
+ typedef unsigned threadId_t;
+#else
+ typedef pthread_t threadId_t;
+#endif
+
+
#if defined(_DEBUG)
+ namespace race {
+
+ class CodePoint {
+ public:
+ string lastName;
+ threadId_t lastTid;
+ string file;
+ CodePoint(string f) : lastTid(0), file(f) { }
+ };
+ class Check {
+ public:
+ Check(CodePoint& p) {
+ threadId_t t = GetCurrentThreadId();
+ if( p.lastTid == 0 ) {
+ p.lastTid = t;
+ p.lastName = getThreadName();
+ }
+ else if( t != p.lastTid ) {
+ log() << "\n\n\n\n\nRACE? error assert\n " << p.file << '\n'
+ << " " << p.lastName
+ << " " << getThreadName() << "\n\n" << endl;
+ mongoAbort("racecheck");
+ }
+ };
+ };
+
+ }
+
+#define RACECHECK
+ // dm TODO - the right code for this file is in a different branch at the moment (merge)
+ //#define RACECHECK
+ //static race::CodePoint __cp(__FILE__);
+ //race::Check __ck(__cp);
+
class CodeBlock {
volatile int n;
- unsigned tid;
+ threadId_t tid;
void fail() {
log() << "synchronization (race condition) failure" << endl;
printStackTrace();
- abort();
+ ::abort();
}
void enter() {
if( ++n != 1 ) fail();
@@ -58,6 +99,8 @@ namespace mongo {
#else
+#define RACECHECK
+
class CodeBlock{
public:
class Within {
@@ -69,4 +112,4 @@ namespace mongo {
#endif
-}
+} // namespace
diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h
index ca81a9f..d8a11ea 100644
--- a/util/concurrency/rwlock.h
+++ b/util/concurrency/rwlock.h
@@ -21,9 +21,11 @@
#include "mutex.h"
#include "../time_support.h"
-// this requires Vista+ to work
+// this requires newer windows versions
// it works better than sharable_mutex under high contention
+#if defined(_WIN64)
//#define MONGO_USE_SRW_ON_WINDOWS 1
+#endif
#if !defined(MONGO_USE_SRW_ON_WINDOWS)
@@ -55,131 +57,153 @@ namespace mongo {
#if defined(MONGO_USE_SRW_ON_WINDOWS) && defined(_WIN32)
- class RWLock {
+ // Windows RWLock implementation (requires newer versions of windows thus the above macro)
+ class RWLock : boost::noncopyable {
public:
- RWLock(const char *) { InitializeSRWLock(&_lock); }
+ RWLock(const char *, int lowPriorityWaitMS=0 ) : _lowPriorityWaitMS(lowPriorityWaitMS)
+ { InitializeSRWLock(&_lock); }
~RWLock() { }
+ const char * implType() const { return "WINSRW"; }
+ int lowPriorityWaitMS() const { return _lowPriorityWaitMS; }
void lock() { AcquireSRWLockExclusive(&_lock); }
void unlock() { ReleaseSRWLockExclusive(&_lock); }
void lock_shared() { AcquireSRWLockShared(&_lock); }
void unlock_shared() { ReleaseSRWLockShared(&_lock); }
bool lock_shared_try( int millis ) {
+ if( TryAcquireSRWLockShared(&_lock) )
+ return true;
+ if( millis == 0 )
+ return false;
unsigned long long end = curTimeMicros64() + millis*1000;
while( 1 ) {
+ Sleep(1);
if( TryAcquireSRWLockShared(&_lock) )
return true;
if( curTimeMicros64() >= end )
break;
- Sleep(1);
}
return false;
}
bool lock_try( int millis = 0 ) {
+ if( TryAcquireSRWLockExclusive(&_lock) ) // quick check to optimistically avoid calling curTimeMicros64
+ return true;
+ if( millis == 0 )
+ return false;
unsigned long long end = curTimeMicros64() + millis*1000;
- while( 1 ) {
+ do {
+ Sleep(1);
if( TryAcquireSRWLockExclusive(&_lock) )
return true;
- if( curTimeMicros64() >= end )
- break;
- Sleep(1);
- }
+ } while( curTimeMicros64() < end );
return false;
}
private:
SRWLOCK _lock;
+ const int _lowPriorityWaitMS;
};
#elif defined(BOOST_RWLOCK)
- class RWLock {
+
+ // Boost based RWLock implementation
+ class RWLock : boost::noncopyable {
shared_mutex _m;
+ const int _lowPriorityWaitMS;
public:
-#if defined(_DEBUG)
- const char *_name;
- RWLock(const char *name) : _name(name) { }
-#else
- RWLock(const char *) { }
-#endif
+ const char * const _name;
+
+ RWLock(const char *name, int lowPriorityWait=0) : _lowPriorityWaitMS(lowPriorityWait) , _name(name) { }
+
+ const char * implType() const { return "boost"; }
+
+ int lowPriorityWaitMS() const { return _lowPriorityWaitMS; }
+
void lock() {
- _m.lock();
-#if defined(_DEBUG)
- mutexDebugger.entering(_name);
-#endif
+ _m.lock();
+ DEV mutexDebugger.entering(_name);
}
+
+ /*void lock() {
+ // This sequence gives us the lock semantics we want: specifically that write lock acquisition is
+ // greedy EXCEPT when someone already is in upgradable state.
+ lockAsUpgradable();
+ upgrade();
+ DEV mutexDebugger.entering(_name);
+ }*/
+
void unlock() {
-#if defined(_DEBUG)
- mutexDebugger.leaving(_name);
-#endif
+ DEV mutexDebugger.leaving(_name);
_m.unlock();
}
+ void lockAsUpgradable() {
+ _m.lock_upgrade();
+ }
+ void unlockFromUpgradable() { // upgradable -> unlocked
+ _m.unlock_upgrade();
+ }
+ void upgrade() { // upgradable -> exclusive lock
+ _m.unlock_upgrade_and_lock();
+ }
+
void lock_shared() {
_m.lock_shared();
}
-
void unlock_shared() {
_m.unlock_shared();
}
bool lock_shared_try( int millis ) {
- boost::system_time until = get_system_time();
- until += boost::posix_time::milliseconds(millis);
- if( _m.timed_lock_shared( until ) ) {
+ if( _m.timed_lock_shared( boost::posix_time::milliseconds(millis) ) ) {
return true;
}
return false;
}
bool lock_try( int millis = 0 ) {
- boost::system_time until = get_system_time();
- until += boost::posix_time::milliseconds(millis);
- if( _m.timed_lock( until ) ) {
-#if defined(_DEBUG)
- mutexDebugger.entering(_name);
-#endif
+ if( _m.timed_lock( boost::posix_time::milliseconds(millis) ) ) {
+ DEV mutexDebugger.entering(_name);
return true;
}
return false;
}
-
-
};
+
#else
- class RWLock {
- pthread_rwlock_t _lock;
- inline void check( int x ) {
- if( x == 0 )
+ // Posix RWLock implementation
+ class RWLock : boost::noncopyable {
+ pthread_rwlock_t _lock;
+ const int _lowPriorityWaitMS;
+ static void check( int x ) {
+ if( MONGO_likely(x == 0) )
return;
log() << "pthread rwlock failed: " << x << endl;
assert( x == 0 );
}
-
+
public:
-#if defined(_DEBUG)
const char *_name;
- RWLock(const char *name) : _name(name) {
-#else
- RWLock(const char *) {
-#endif
+ RWLock(const char *name, int lowPriorityWaitMS=0) : _lowPriorityWaitMS(lowPriorityWaitMS), _name(name)
+ {
check( pthread_rwlock_init( &_lock , 0 ) );
}
-
+
~RWLock() {
if ( ! StaticObserver::_destroyingStatics ) {
- check( pthread_rwlock_destroy( &_lock ) );
+ wassert( pthread_rwlock_destroy( &_lock ) == 0 ); // wassert as don't want to throw from a destructor
}
}
+ const char * implType() const { return "posix"; }
+
+ int lowPriorityWaitMS() const { return _lowPriorityWaitMS; }
+
void lock() {
check( pthread_rwlock_wrlock( &_lock ) );
-#if defined(_DEBUG)
- mutexDebugger.entering(_name);
-#endif
+ DEV mutexDebugger.entering(_name);
}
void unlock() {
-#if defined(_DEBUG)
mutexDebugger.leaving(_name);
-#endif
check( pthread_rwlock_unlock( &_lock ) );
}
@@ -197,9 +221,7 @@ namespace mongo {
bool lock_try( int millis = 0 ) {
if( _try( millis , true ) ) {
-#if defined(_DEBUG)
- mutexDebugger.entering(_name);
-#endif
+ DEV mutexDebugger.entering(_name);
return true;
}
return false;
@@ -233,7 +255,7 @@ namespace mongo {
#endif
/** throws on failure to acquire in the specified time period. */
- class rwlock_try_write {
+ class rwlock_try_write : boost::noncopyable {
public:
struct exception { };
rwlock_try_write(RWLock& l, int millis = 0) : _l(l) {
@@ -245,16 +267,57 @@ namespace mongo {
RWLock& _l;
};
+ class rwlock_shared : boost::noncopyable {
+ public:
+ rwlock_shared(RWLock& rwlock) : _r(rwlock) {_r.lock_shared(); }
+ ~rwlock_shared() { _r.unlock_shared(); }
+ private:
+ RWLock& _r;
+ };
+
/* scoped lock for RWLock */
- class rwlock {
+ class rwlock : boost::noncopyable {
public:
- rwlock( const RWLock& lock , bool write , bool alreadyHaveLock = false )
+ /**
+ * @param write acquire write lock if true sharable if false
+ * @param lowPriority if > 0, will try to get the lock non-greedily for that many ms
+ */
+ rwlock( const RWLock& lock , bool write, /* bool alreadyHaveLock = false , */int lowPriorityWaitMS = 0 )
: _lock( (RWLock&)lock ) , _write( write ) {
- if ( ! alreadyHaveLock ) {
- if ( _write )
- _lock.lock();
- else
+
+ {
+ if ( _write ) {
+
+ if ( ! lowPriorityWaitMS && lock.lowPriorityWaitMS() )
+ lowPriorityWaitMS = lock.lowPriorityWaitMS();
+
+ if ( lowPriorityWaitMS ) {
+ bool got = false;
+ for ( int i=0; i<lowPriorityWaitMS; i++ ) {
+ if ( _lock.lock_try(0) ) {
+ got = true;
+ break;
+ }
+
+ int sleep = 1;
+ if ( i > ( lowPriorityWaitMS / 20 ) )
+ sleep = 10;
+ sleepmillis(sleep);
+ i += ( sleep - 1 );
+ }
+ if ( ! got ) {
+ log() << "couldn't get lazy rwlock" << endl;
+ _lock.lock();
+ }
+ }
+ else {
+ _lock.lock();
+ }
+
+ }
+ else {
_lock.lock_shared();
+ }
}
}
~rwlock() {
@@ -267,4 +330,67 @@ namespace mongo {
RWLock& _lock;
const bool _write;
};
+
+ /** recursive on shared locks is ok for this implementation */
+ class RWLockRecursive : boost::noncopyable {
+ ThreadLocalValue<int> _state;
+ RWLock _lk;
+ friend class Exclusive;
+ public:
+ /** @param lpwaitms lazy wait */
+ RWLockRecursive(const char *name, int lpwaitms) : _lk(name, lpwaitms) { }
+
+ void assertExclusivelyLocked() {
+ dassert( _state.get() < 0 );
+ }
+
+ // RWLockRecursive::Exclusive scoped lock
+ class Exclusive : boost::noncopyable {
+ RWLockRecursive& _r;
+ rwlock *_scopedLock;
+ public:
+ Exclusive(RWLockRecursive& r) : _r(r), _scopedLock(0) {
+ int s = _r._state.get();
+ dassert( s <= 0 );
+ if( s == 0 )
+ _scopedLock = new rwlock(_r._lk, true);
+ _r._state.set(s-1);
+ }
+ ~Exclusive() {
+ int s = _r._state.get();
+ DEV wassert( s < 0 ); // wassert: don't throw from destructors
+ _r._state.set(s+1);
+ delete _scopedLock;
+ }
+ };
+
+ // RWLockRecursive::Shared scoped lock
+ class Shared : boost::noncopyable {
+ RWLockRecursive& _r;
+ bool _alreadyExclusive;
+ public:
+ Shared(RWLockRecursive& r) : _r(r) {
+ int s = _r._state.get();
+ _alreadyExclusive = s < 0;
+ if( !_alreadyExclusive ) {
+ dassert( s >= 0 ); // -1 would mean exclusive
+ if( s == 0 )
+ _r._lk.lock_shared();
+ _r._state.set(s+1);
+ }
+ }
+ ~Shared() {
+ if( _alreadyExclusive ) {
+ DEV wassert( _r._state.get() < 0 );
+ }
+ else {
+ int s = _r._state.get() - 1;
+ if( s == 0 )
+ _r._lk.unlock_shared();
+ _r._state.set(s);
+ DEV wassert( s >= 0 );
+ }
+ }
+ };
+ };
}
diff --git a/util/concurrency/shared_mutex_win.hpp b/util/concurrency/shared_mutex_win.hpp
index 5356cf2..e850fc6 100755..100644
--- a/util/concurrency/shared_mutex_win.hpp
+++ b/util/concurrency/shared_mutex_win.hpp
@@ -7,10 +7,31 @@
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
-// MongoDB :
-//
-// Slightly modified boost file to not die above 127 pending writes
-//
+/* MongoDB :
+ Slightly modified boost file to not die above 127 pending writes
+ Here is what changed (from boost 1.42.0 shared_mutex.hpp):
+ 1,2c1,2
+ < #ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP
+ < #define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP
+ ---
+ > #ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+ > #define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+ 22c27
+ < class shared_mutex:
+ ---
+ > class modified_shared_mutex:
+ 73c78
+ < shared_mutex():
+ ---
+ > modified_shared_mutex():
+ 84c89
+ < ~shared_mutex()
+ ---
+ > ~modified_shared_mutex()
+ 283a289,290
+ > if( new_state.exclusive_waiting == 127 ) // the maximum already!
+ > break;
+*/
#include <boost/assert.hpp>
#include <boost/detail/interlocked.hpp>
diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp
index 0f33609..1811f15 100644
--- a/util/concurrency/spin_lock.cpp
+++ b/util/concurrency/spin_lock.cpp
@@ -25,20 +25,28 @@ namespace mongo {
SpinLock::~SpinLock() {
#if defined(_WIN32)
DeleteCriticalSection(&_cs);
+#elif defined(__USE_XOPEN2K)
+ pthread_spin_destroy(&_lock);
#endif
}
SpinLock::SpinLock()
-#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
- : _locked( false ) { }
-#elif defined(_WIN32)
+#if defined(_WIN32)
{ InitializeCriticalSectionAndSpinCount(&_cs, 4000); }
+#elif defined(__USE_XOPEN2K)
+ { pthread_spin_init( &_lock , 0 ); }
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ : _locked( false ) { }
#else
- : _mutex( "SpinLock" ) { }
+ : _mutex( "SpinLock" ) { }
#endif
void SpinLock::lock() {
-#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+#if defined(_WIN32)
+ EnterCriticalSection(&_cs);
+#elif defined(__USE_XOPEN2K)
+ pthread_spin_lock( &_lock );
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
// fast path
if (!_locked && !__sync_lock_test_and_set(&_locked, true)) {
return;
@@ -55,8 +63,6 @@ namespace mongo {
while (__sync_lock_test_and_set(&_locked, true)) {
nanosleep(&t, NULL);
}
-#elif defined(_WIN32)
- EnterCriticalSection(&_cs);
#else
// WARNING Missing spin lock in this platform. This can potentially
// be slow.
@@ -66,19 +72,28 @@ namespace mongo {
}
void SpinLock::unlock() {
-#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
-
- __sync_lock_release(&_locked);
-
-#elif defined(WIN32)
-
+#if defined(_WIN32)
LeaveCriticalSection(&_cs);
-
+#elif defined(__USE_XOPEN2K)
+ pthread_spin_unlock(&_lock);
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ __sync_lock_release(&_locked);
#else
-
_mutex.unlock();
+#endif
+ }
+ bool SpinLock::isfast() {
+#if defined(_WIN32)
+ return true;
+#elif defined(__USE_XOPEN2K)
+ return true;
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ return true;
+#else
+ return false;
#endif
}
+
} // namespace mongo
diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h
index 02a8797..65ecb15 100644
--- a/util/concurrency/spin_lock.h
+++ b/util/concurrency/spin_lock.h
@@ -18,8 +18,7 @@
#pragma once
-#include "pch.h"
-#include "rwlock.h"
+#include "mutex.h"
namespace mongo {
@@ -27,7 +26,7 @@ namespace mongo {
* The spinlock currently requires late GCC support routines to be efficient.
* Other platforms default to a mutex implemenation.
*/
- class SpinLock {
+ class SpinLock : boost::noncopyable {
public:
SpinLock();
~SpinLock();
@@ -35,30 +34,30 @@ namespace mongo {
void lock();
void unlock();
+ static bool isfast(); // true if a real spinlock on this platform
+
private:
-#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
- volatile bool _locked;
-#elif defined(_WIN32)
+#if defined(_WIN32)
CRITICAL_SECTION _cs;
+#elif defined(__USE_XOPEN2K)
+ pthread_spinlock_t _lock;
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ volatile bool _locked;
#else
- // default to a scoped mutex if not implemented
- RWLock _mutex;
+ // default to a mutex if not implemented
+ SimpleMutex _mutex;
#endif
-
- // Non-copyable, non-assignable
- SpinLock(SpinLock&);
- SpinLock& operator=(SpinLock&);
};
- struct scoped_spinlock {
- scoped_spinlock( SpinLock& l ) : _l(l){
+ class scoped_spinlock : boost::noncopyable {
+ public:
+ scoped_spinlock( SpinLock& l ) : _l(l) {
_l.lock();
}
~scoped_spinlock() {
- _l.unlock();
- }
+ _l.unlock();}
+ private:
SpinLock& _l;
};
} // namespace mongo
-
diff --git a/util/concurrency/synchronization.cpp b/util/concurrency/synchronization.cpp
index 12e2894..ce2547c 100644
--- a/util/concurrency/synchronization.cpp
+++ b/util/concurrency/synchronization.cpp
@@ -20,7 +20,8 @@
namespace mongo {
- Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { }
+ Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) {
+ }
Notification::~Notification() { }
@@ -37,19 +38,40 @@ namespace mongo {
_condition.notify_one();
}
- NotifyAll::NotifyAll() : _mutex("NotifyAll"), _counter(0) { }
+ /* --- NotifyAll --- */
+
+ NotifyAll::NotifyAll() : _mutex("NotifyAll") {
+ _lastDone = 0;
+ _lastReturned = 0;
+ _nWaiting = 0;
+ }
+
+ NotifyAll::When NotifyAll::now() {
+ scoped_lock lock( _mutex );
+ return ++_lastReturned;
+ }
+
+ void NotifyAll::waitFor(When e) {
+ scoped_lock lock( _mutex );
+ ++_nWaiting;
+ while( _lastDone < e ) {
+ _condition.wait( lock.boost() );
+ }
+ }
- void NotifyAll::wait() {
+ void NotifyAll::awaitBeyondNow() {
scoped_lock lock( _mutex );
- unsigned long long old = _counter;
- while( old == _counter ) {
+ ++_nWaiting;
+ When e = ++_lastReturned;
+ while( _lastDone <= e ) {
_condition.wait( lock.boost() );
}
}
- void NotifyAll::notifyAll() {
+ void NotifyAll::notifyAll(When e) {
scoped_lock lock( _mutex );
- ++_counter;
+ _lastDone = e;
+ _nWaiting = 0;
_condition.notify_all();
}
diff --git a/util/concurrency/synchronization.h b/util/concurrency/synchronization.h
index ac2fcab..a0e89f7 100644
--- a/util/concurrency/synchronization.h
+++ b/util/concurrency/synchronization.h
@@ -56,18 +56,30 @@ namespace mongo {
public:
NotifyAll();
+ typedef unsigned long long When;
+
+ When now();
+
/** awaits the next notifyAll() call by another thread. notifications that precede this
call are ignored -- we are looking for a fresh event.
*/
- void wait();
+ void waitFor(When);
+
+ /** a bit faster than waitFor( now() ) */
+ void awaitBeyondNow();
/** may be called multiple times. notifies all waiters */
- void notifyAll();
+ void notifyAll(When);
+
+ /** indicates how many threads are waiting for a notify. */
+ unsigned nWaiting() const { return _nWaiting; }
private:
mongo::mutex _mutex;
- unsigned long long _counter;
boost::condition _condition;
+ When _lastDone;
+ When _lastReturned;
+ unsigned _nWaiting;
};
} // namespace mongo
diff --git a/util/concurrency/value.h b/util/concurrency/value.h
index 0a0ef85..c66977b 100644
--- a/util/concurrency/value.h
+++ b/util/concurrency/value.h
@@ -1,5 +1,5 @@
/* @file value.h
- concurrency helpers Atomic<T> and DiagStr
+ concurrency helpers DiagStr, Guarded
*/
/**
@@ -20,44 +20,29 @@
#pragma once
+#include "mutex.h"
+
namespace mongo {
- extern mutex _atomicMutex;
+ /** declare that a variable that is "guarded" by a mutex.
- /** atomic wrapper for a value. enters a mutex on each access. must
- be copyable.
- */
- template<typename T>
- class Atomic : boost::noncopyable {
- T val;
- public:
- Atomic<T>() { }
+ The decl documents the rule. For example "counta and countb are guarded by xyzMutex":
- void operator=(const T& a) {
- scoped_lock lk(_atomicMutex);
- val = a;
- }
+ Guarded<int, xyzMutex> counta;
+ Guarded<int, xyzMutex> countb;
- operator T() const {
- scoped_lock lk(_atomicMutex);
- return val;
+ Upon use, specify the scoped_lock object. This makes it hard for someone
+ later to forget to be in the lock. Check is made that it is the right lock in _DEBUG
+ builds at runtime.
+ */
+ template <typename T, mutex& BY>
+ class Guarded {
+ T _val;
+ public:
+ T& ref(const scoped_lock& lk) {
+ dassert( lk._mut == &BY );
+ return _val;
}
-
- /** example:
- Atomic<int> q;
- ...
- {
- Atomic<int>::tran t(q);
- if( q.ref() > 0 )
- q.ref()--;
- }
- */
- class tran : private scoped_lock {
- Atomic<T>& _a;
- public:
- tran(Atomic<T>& a) : scoped_lock(_atomicMutex), _a(a) { }
- T& ref() { return _a.val; }
- };
};
class DiagStr {
diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp
index 19b58eb..213e576 100644
--- a/util/concurrency/vars.cpp
+++ b/util/concurrency/vars.cpp
@@ -17,15 +17,13 @@
*/
#include "pch.h"
-#include "value.h"
#include "mutex.h"
+#include "value.h"
namespace mongo {
mutex DiagStr::m("diags");
- mongo::mutex _atomicMutex("_atomicMutex");
-
// intentional leak. otherwise destructor orders can be problematic at termination.
MutexDebugger &mutexDebugger = *(new MutexDebugger());
diff --git a/util/file.h b/util/file.h
index 0a973e3..368e692 100644
--- a/util/file.h
+++ b/util/file.h
@@ -23,10 +23,8 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
-#else
-#include <windows.h>
+#include <sys/statvfs.h>
#endif
-
#include "text.h"
namespace mongo {
@@ -37,6 +35,8 @@ namespace mongo {
typedef boost::uint64_t fileofs;
#endif
+ /* NOTE: not thread-safe. (at least the windows implementation isn't. */
+
class FileInterface {
public:
void open(const char *fn) {}
@@ -46,6 +46,12 @@ namespace mongo {
bool is_open() {return false;}
fileofs len() { return 0; }
void fsync() { assert(false); }
+
+ // shrink file to size bytes. No-op if file already smaller.
+ void truncate(fileofs size);
+
+ /** @return -1 if error or unavailable */
+ static boost::intmax_t freeSpace(const string &path) { assert(false); return -1; }
};
#if defined(_WIN32)
@@ -54,10 +60,11 @@ namespace mongo {
class File : public FileInterface {
HANDLE fd;
bool _bad;
+ string _name;
void err(BOOL b=false) { /* false = error happened */
if( !b && !_bad ) {
_bad = true;
- log() << "File I/O error " << GetLastError() << '\n';
+ log() << "File " << _name << "I/O error " << GetLastError() << '\n';
}
}
public:
@@ -69,7 +76,8 @@ namespace mongo {
if( is_open() ) CloseHandle(fd);
fd = INVALID_HANDLE_VALUE;
}
- void open(const char *filename, bool readOnly=false ) {
+ void open(const char *filename, bool readOnly=false , bool direct=false) {
+ _name = filename;
fd = CreateFile(
toNativeString(filename).c_str(),
( readOnly ? 0 : GENERIC_WRITE ) | GENERIC_READ, FILE_SHARE_WRITE|FILE_SHARE_READ,
@@ -81,6 +89,15 @@ namespace mongo {
else
_bad = false;
}
+ static boost::intmax_t freeSpace(const string &path) {
+ ULARGE_INTEGER avail;
+ if( GetDiskFreeSpaceEx(toNativeString(path.c_str()).c_str(), &avail, NULL, NULL) ) {
+ return avail.QuadPart;
+ }
+ DWORD e = GetLastError();
+ log() << "GetDiskFreeSpaceEx fails errno: " << e << endl;
+ return -1;
+ }
void write(fileofs o, const char *data, unsigned len) {
LARGE_INTEGER li;
li.QuadPart = o;
@@ -111,6 +128,20 @@ namespace mongo {
return li.QuadPart;
}
void fsync() { FlushFileBuffers(fd); }
+
+ void truncate(fileofs size) {
+ if (len() <= size)
+ return;
+
+ LARGE_INTEGER li;
+ li.QuadPart = size;
+ if (SetFilePointerEx(fd, li, NULL, FILE_BEGIN) == 0){
+ err(false);
+ return; //couldn't seek
+ }
+
+ err(SetEndOfFile(fd));
+ }
};
#else
@@ -140,9 +171,13 @@ namespace mongo {
#define O_NOATIME 0
#endif
- void open(const char *filename, bool readOnly=false ) {
+ void open(const char *filename, bool readOnly=false , bool direct=false) {
fd = ::open(filename,
- O_CREAT | ( readOnly ? 0 : ( O_RDWR | O_NOATIME ) ) ,
+ O_CREAT | ( readOnly ? 0 : ( O_RDWR | O_NOATIME ) )
+#if defined(O_DIRECT)
+ | ( direct ? O_DIRECT : 0 )
+#endif
+ ,
S_IRUSR | S_IWUSR);
if ( fd <= 0 ) {
out() << "couldn't open " << filename << ' ' << errnoWithDescription() << endl;
@@ -154,14 +189,37 @@ namespace mongo {
err( ::pwrite(fd, data, len, o) == (int) len );
}
void read(fileofs o, char *data, unsigned len) {
- err( ::pread(fd, data, len, o) == (int) len );
+ ssize_t s = ::pread(fd, data, len, o);
+ if( s == -1 ) {
+ err(false);
+ }
+ else if( s != (int) len ) {
+ _bad = true;
+ log() << "File error read:" << s << " bytes, wanted:" << len << " ofs:" << o << endl;
+ }
}
bool bad() { return _bad; }
bool is_open() { return fd > 0; }
fileofs len() {
- return lseek(fd, 0, SEEK_END);
+ off_t o = lseek(fd, 0, SEEK_END);
+ if( o != (off_t) -1 )
+ return o;
+ err(false);
+ return 0;
}
void fsync() { ::fsync(fd); }
+ static boost::intmax_t freeSpace ( const string &path ) {
+ struct statvfs info;
+ assert( !statvfs( path.c_str() , &info ) );
+ return boost::intmax_t( info.f_bavail ) * info.f_frsize;
+ }
+
+ void truncate(fileofs size) {
+ if (len() <= size)
+ return;
+
+ err(ftruncate(fd, size) == 0);
+ }
};
diff --git a/util/file_allocator.cpp b/util/file_allocator.cpp
index 54590ed..b0572f9 100644
--- a/util/file_allocator.cpp
+++ b/util/file_allocator.cpp
@@ -32,19 +32,22 @@ using namespace mongoutils;
#endif
#include "file_allocator.h"
+#include "paths.h"
namespace mongo {
- void ensureParentDirCreated(const boost::filesystem::path& p){
+ boost::filesystem::path ensureParentDirCreated(const boost::filesystem::path& p){
const boost::filesystem::path parent = p.branch_path();
-
+
if (! boost::filesystem::exists(parent)){
ensureParentDirCreated(parent);
log() << "creating directory " << parent.string() << endl;
boost::filesystem::create_directory(parent);
+ flushMyDirectory(parent); // flushes grandparent to ensure parent exists after crash
}
-
+
assert(boost::filesystem::is_directory(parent));
+ return parent;
}
#if defined(_WIN32)
@@ -74,6 +77,10 @@ namespace mongo {
// TODO : we should to avoid fragmentation
}
+ bool FileAllocator::hasFailed() const {
+ return false;
+ }
+
#else
FileAllocator::FileAllocator()
@@ -174,6 +181,10 @@ namespace mongo {
}
}
+ bool FileAllocator::hasFailed() const {
+ return _failed;
+ }
+
void FileAllocator::checkFailure() {
if (_failed) {
// we want to log the problem (diskfull.js expects it) but we do not want to dump a stack tracke
@@ -197,6 +208,19 @@ namespace mongo {
return false;
}
+ string makeTempFileName( path root ) {
+ while( 1 ) {
+ path p = root / "_tmp";
+ stringstream ss;
+ ss << (unsigned) rand();
+ p /= ss.str();
+ string fn = p.string();
+ if( !boost::filesystem::exists(p) )
+ return fn;
+ }
+ return "";
+ }
+
void FileAllocator::run( FileAllocator * fa ) {
setThreadName( "FileAllocator" );
while( 1 ) {
@@ -215,19 +239,25 @@ namespace mongo {
name = fa->_pending.front();
size = fa->_pendingSize[ name ];
}
+
+ string tmp;
+ long fd = 0;
try {
log() << "allocating new datafile " << name << ", filling with zeroes..." << endl;
- ensureParentDirCreated(name);
- long fd = open(name.c_str(), O_CREAT | O_RDWR | O_NOATIME, S_IRUSR | S_IWUSR);
+
+ boost::filesystem::path parent = ensureParentDirCreated(name);
+ tmp = makeTempFileName( parent );
+ ensureParentDirCreated(tmp);
+
+ fd = open(tmp.c_str(), O_CREAT | O_RDWR | O_NOATIME, S_IRUSR | S_IWUSR);
if ( fd <= 0 ) {
- stringstream ss;
- ss << "FileAllocator: couldn't open " << name << ' ' << errnoWithDescription();
- uassert( 10439 , ss.str(), fd <= 0 );
+ log() << "FileAllocator: couldn't create " << name << " (" << tmp << ") " << errnoWithDescription() << endl;
+ uasserted(10439, "");
}
#if defined(POSIX_FADV_DONTNEED)
if( posix_fadvise(fd, 0, size, POSIX_FADV_DONTNEED) ) {
- log() << "warning: posix_fadvise fails " << name << ' ' << errnoWithDescription() << endl;
+ log() << "warning: posix_fadvise fails " << name << " (" << tmp << ") " << errnoWithDescription() << endl;
}
#endif
@@ -236,18 +266,32 @@ namespace mongo {
/* make sure the file is the full desired length */
ensureLength( fd , size );
+ close( fd );
+ fd = 0;
+
+ if( rename(tmp.c_str(), name.c_str()) ) {
+ log() << "error: couldn't rename " << tmp << " to " << name << ' ' << errnoWithDescription() << endl;
+ uasserted(13653, "");
+ }
+ flushMyDirectory(name);
+
log() << "done allocating datafile " << name << ", "
<< "size: " << size/1024/1024 << "MB, "
<< " took " << ((double)t.millis())/1000.0 << " secs"
<< endl;
- close( fd );
-
+ // no longer in a failed state. allow new writers.
+ fa->_failed = false;
}
catch ( ... ) {
+ if ( fd > 0 )
+ close( fd );
log() << "error failed to allocate new file: " << name
- << " size: " << size << ' ' << errnoWithDescription() << endl;
+ << " size: " << size << ' ' << errnoWithDescription() << warnings;
+ log() << " will try again in 10 seconds" << endl; // not going to warning logs
try {
+ if ( tmp.size() )
+ BOOST_CHECK_EXCEPTION( boost::filesystem::remove( tmp ) );
BOOST_CHECK_EXCEPTION( boost::filesystem::remove( name ) );
}
catch ( ... ) {
@@ -256,7 +300,10 @@ namespace mongo {
fa->_failed = true;
// not erasing from pending
fa->_pendingUpdated.notify_all();
- return; // no more allocation
+
+
+ sleepsecs(10);
+ continue;
}
{
diff --git a/util/file_allocator.h b/util/file_allocator.h
index 6cc7b2d..7c3cacb 100644
--- a/util/file_allocator.h
+++ b/util/file_allocator.h
@@ -47,12 +47,14 @@ namespace mongo {
void allocateAsap( const string &name, unsigned long long &size );
void waitUntilFinished() const;
+
+ bool hasFailed() const;
static void ensureLength(int fd , long size);
/** @return the singletone */
static FileAllocator * get();
-
+
private:
FileAllocator();
@@ -84,6 +86,6 @@ namespace mongo {
};
/** like "mkdir -p" but on parent dir of p rather than p itself */
- void ensureParentDirCreated(const boost::filesystem::path& p);
+ boost::filesystem::path ensureParentDirCreated(const boost::filesystem::path& p);
} // namespace mongo
diff --git a/util/goodies.h b/util/goodies.h
index 53a74c2..65bfbab 100644
--- a/util/goodies.h
+++ b/util/goodies.h
@@ -109,49 +109,12 @@ namespace mongo {
// PRINTFL; prints file:line
#define MONGO_PRINTFL cout << __FILE__ ":" << __LINE__ << endl
#define PRINTFL MONGO_PRINTFL
+#define MONGO_FLOG log() << __FILE__ ":" << __LINE__ << endl
+#define FLOG MONGO_FLOG
#undef assert
#define assert MONGO_assert
- struct WrappingInt {
- WrappingInt() {
- x = 0;
- }
- WrappingInt(unsigned z) : x(z) { }
- unsigned x;
- operator unsigned() const {
- return x;
- }
-
-
- static int diff(unsigned a, unsigned b) {
- return a-b;
- }
- bool operator<=(WrappingInt r) {
- // platform dependent
- int df = (r.x - x);
- return df >= 0;
- }
- bool operator>(WrappingInt r) {
- return !(r<=*this);
- }
- };
-
- /*
-
- class DebugMutex : boost::noncopyable {
- friend class lock;
- mongo::mutex m;
- int locked;
- public:
- DebugMutex() : locked(0); { }
- bool isLocked() { return locked; }
- };
-
- */
-
-//typedef scoped_lock lock;
-
inline bool startsWith(const char *str, const char *prefix) {
size_t l = strlen(prefix);
if ( strlen(str) < l ) return false;
@@ -236,6 +199,7 @@ namespace mongo {
_active = 0;
}
+ // typically you do ProgressMeterHolder
void reset( unsigned long long total , int secondsBetween = 3 , int checkInterval = 100 ) {
_total = total;
_secondsBetween = secondsBetween;
@@ -257,6 +221,7 @@ namespace mongo {
}
/**
+ * @param n how far along we are relative to the total # we set in CurOp::setMessage
* @return if row was printed
*/
bool hit( int n = 1 ) {
@@ -282,13 +247,15 @@ namespace mongo {
return true;
}
- unsigned long long done() {
- return _done;
+ void setTotalWhileRunning( unsigned long long total ) {
+ _total = total;
}
- unsigned long long hits() {
- return _hits;
- }
+ unsigned long long done() const { return _done; }
+
+ unsigned long long hits() const { return _hits; }
+
+ unsigned long long total() const { return _total; }
string toString() const {
if ( ! _active )
@@ -314,6 +281,10 @@ namespace mongo {
int _lastTime;
};
+ // e.g.:
+ // CurOp * op = cc().curop();
+ // ProgressMeterHolder pm( op->setMessage( "index: (1/3) external sort" , d->stats.nrecords , 10 ) );
+ // loop { pm.hit(); }
class ProgressMeterHolder : boost::noncopyable {
public:
ProgressMeterHolder( ProgressMeter& pm )
@@ -417,7 +388,7 @@ namespace mongo {
class ThreadSafeString {
public:
ThreadSafeString( size_t size=256 )
- : _size( 256 ) , _buf( new char[256] ) {
+ : _size( size ) , _buf( new char[size] ) {
memset( _buf , 0 , _size );
}
@@ -468,97 +439,6 @@ namespace mongo {
ostream& operator<<( ostream &s, const ThreadSafeString &o );
- inline bool isNumber( char c ) {
- return c >= '0' && c <= '9';
- }
-
- inline unsigned stringToNum(const char *str) {
- unsigned x = 0;
- const char *p = str;
- while( 1 ) {
- if( !isNumber(*p) ) {
- if( *p == 0 && p != str )
- break;
- throw 0;
- }
- x = x * 10 + *p++ - '0';
- }
- return x;
- }
-
- // for convenience, '{' is greater than anything and stops number parsing
- inline int lexNumCmp( const char *s1, const char *s2 ) {
- //cout << "START : " << s1 << "\t" << s2 << endl;
- while( *s1 && *s2 ) {
-
- bool p1 = ( *s1 == (char)255 );
- bool p2 = ( *s2 == (char)255 );
- //cout << "\t\t " << p1 << "\t" << p2 << endl;
- if ( p1 && !p2 )
- return 1;
- if ( p2 && !p1 )
- return -1;
-
- bool n1 = isNumber( *s1 );
- bool n2 = isNumber( *s2 );
-
- if ( n1 && n2 ) {
- // get rid of leading 0s
- while ( *s1 == '0' ) s1++;
- while ( *s2 == '0' ) s2++;
-
- char * e1 = (char*)s1;
- char * e2 = (char*)s2;
-
- // find length
- // if end of string, will break immediately ('\0')
- while ( isNumber (*e1) ) e1++;
- while ( isNumber (*e2) ) e2++;
-
- int len1 = (int)(e1-s1);
- int len2 = (int)(e2-s2);
-
- int result;
- // if one is longer than the other, return
- if ( len1 > len2 ) {
- return 1;
- }
- else if ( len2 > len1 ) {
- return -1;
- }
- // if the lengths are equal, just strcmp
- else if ( (result = strncmp(s1, s2, len1)) != 0 ) {
- return result;
- }
-
- // otherwise, the numbers are equal
- s1 = e1;
- s2 = e2;
- continue;
- }
-
- if ( n1 )
- return 1;
-
- if ( n2 )
- return -1;
-
- if ( *s1 > *s2 )
- return 1;
-
- if ( *s2 > *s1 )
- return -1;
-
- s1++; s2++;
- }
-
- if ( *s1 )
- return 1;
- if ( *s2 )
- return -1;
- return 0;
- }
-
/** A generic pointer type for function arguments.
* It will convert from any pointer type except auto_ptr.
* Semantics are the same as passing the pointer returned from get()
@@ -597,6 +477,8 @@ namespace mongo {
T* _p;
};
+
+
/** Hmmmm */
using namespace boost;
diff --git a/util/hashtab.h b/util/hashtab.h
index 6818bef..f1a3306 100644
--- a/util/hashtab.h
+++ b/util/hashtab.h
@@ -54,7 +54,7 @@ namespace mongo {
}
};
void* _buf;
- int n;
+ int n; // number of hashtable buckets
int maxChain;
Node& nodes(int i) {
@@ -156,9 +156,9 @@ namespace mongo {
typedef void (*IteratorCallback)( const Key& k , Type& v );
void iterAll( IteratorCallback callback ) {
for ( int i=0; i<n; i++ ) {
- if ( ! nodes(i).inUse() )
- continue;
- callback( nodes(i).k , nodes(i).value );
+ if ( nodes(i).inUse() ) {
+ callback( nodes(i).k , nodes(i).value );
+ }
}
}
@@ -166,9 +166,9 @@ namespace mongo {
typedef void (*IteratorCallback2)( const Key& k , Type& v , void * extra );
void iterAll( IteratorCallback2 callback , void * extra ) {
for ( int i=0; i<n; i++ ) {
- if ( ! nodes(i).inUse() )
- continue;
- callback( nodes(i).k , nodes(i).value , extra );
+ if ( nodes(i).inUse() ) {
+ callback( nodes(i).k , nodes(i).value , extra );
+ }
}
}
diff --git a/util/log.cpp b/util/log.cpp
index eb1cbae..bc48584 100644
--- a/util/log.cpp
+++ b/util/log.cpp
@@ -19,16 +19,18 @@
#include "pch.h"
#include "assert_util.h"
#include "assert.h"
-//#include "file.h"
#include <cmath>
+#include "time_support.h"
using namespace std;
-#ifndef _WIN32
-#include <cxxabi.h>
-#include <sys/file.h>
+#ifdef _WIN32
+# include <io.h>
+#else
+# include <cxxabi.h>
+# include <sys/file.h>
#endif
-#include "../db/jsobj.h"
+//#include "../db/jsobj.h"
namespace mongo {
@@ -47,6 +49,8 @@ namespace mongo {
uassert( 10268 , "LoggingManager already started" , ! _enabled );
_append = append;
+ bool exists = boost::filesystem::exists(lp);
+
// test path
FILE * test = fopen( lp.c_str() , _append ? "a" : "w" );
if ( ! test ) {
@@ -59,6 +63,14 @@ namespace mongo {
dbexit( EXIT_BADOPTIONS );
assert( 0 );
}
+
+ if (append && exists){
+ // two blank lines before and after
+ const string msg = "\n\n***** SERVER RESTARTED *****\n\n\n";
+ massert(14036, errnoWithPrefix("couldn't write to log file"),
+ fwrite(msg.data(), 1, msg.size(), test) == msg.size());
+ }
+
fclose( test );
_path = lp;
@@ -74,7 +86,7 @@ namespace mongo {
if ( _file ) {
#ifdef _WIN32
- cout << "log rotation doesn't work on windows" << endl;
+ cout << "log rotation net yet supported on windows" << endl;
return;
#else
struct tm t;
@@ -95,8 +107,21 @@ namespace mongo {
assert(0);
}
+#ifdef _WIN32 // windows has these functions it just gives them a funny name
+# define dup2 _dup2
+# define fileno _fileno
+#endif
+ // redirect stderr to log file
+ dup2(fileno(tmp), 2);
+
Logstream::setLogFile(tmp); // after this point no thread will be using old file
+#if 0 // enable to test redirection
+ cout << "written to cout" << endl;
+ cerr << "written to cerr" << endl;
+ log() << "written to log()" << endl;
+#endif
+
_file = tmp;
_opened = time(0);
}
@@ -125,4 +150,3 @@ namespace mongo {
FILE* Logstream::logfile = stdout;
}
-
diff --git a/util/log.h b/util/log.h
index 86aae1c..d5c7e55 100644
--- a/util/log.h
+++ b/util/log.h
@@ -46,6 +46,39 @@ namespace mongo {
}
}
+ class LabeledLevel {
+ public:
+
+ LabeledLevel( int level ) : _level( level ) {}
+ LabeledLevel( const char* label, int level ) : _label( label ), _level( level ) {}
+ LabeledLevel( const string& label, int level ) : _label( label ), _level( level ) {}
+
+ LabeledLevel operator+( int i ) const {
+ return LabeledLevel( _label, _level + i );
+ }
+
+ LabeledLevel operator+( const char* label ) const {
+ if( _label == "" )
+ return LabeledLevel( label, _level );
+ return LabeledLevel( _label + string("::") + label, _level );
+ }
+
+ LabeledLevel operator+( string& label ) const {
+ return LabeledLevel( _label + string("::") + label, _level );
+ }
+
+ LabeledLevel operator-( int i ) const {
+ return LabeledLevel( _label, _level - i );
+ }
+
+ const string& getLabel() const { return _label; }
+ int getLevel() const { return _level; }
+
+ private:
+ string _label;
+ int _level;
+ };
+
class LazyString {
public:
virtual ~LazyString() {}
@@ -104,6 +137,9 @@ namespace mongo {
virtual Nullstream& operator<<(unsigned) {
return *this;
}
+ virtual Nullstream& operator<<(unsigned short) {
+ return *this;
+ }
virtual Nullstream& operator<<(double) {
return *this;
}
@@ -209,6 +245,7 @@ namespace mongo {
Logstream& operator<<(long x) { ss << x; return *this; }
Logstream& operator<<(unsigned long x) { ss << x; return *this; }
Logstream& operator<<(unsigned x) { ss << x; return *this; }
+ Logstream& operator<<(unsigned short x){ ss << x; return *this; }
Logstream& operator<<(double x) { ss << x; return *this; }
Logstream& operator<<(void *x) { ss << x; return *this; }
Logstream& operator<<(const void *x) { ss << x; return *this; }
@@ -261,6 +298,9 @@ namespace mongo {
}
public:
static Logstream& get() {
+ if ( StaticObserver::_destroyingStatics ) {
+ cout << "Logstream::get called in uninitialized state" << endl;
+ }
Logstream *p = tsp.get();
if( p == 0 )
tsp.reset( p = new Logstream() );
@@ -291,7 +331,7 @@ namespace mongo {
return Logstream::get();
}
- /** logging which we may not want during unit tests runs.
+ /** logging which we may not want during unit tests (dbtests) runs.
set tlogLevel to -1 to suppress tlog() output in a test program. */
inline Nullstream& tlog( int level = 0 ) {
if ( level > tlogLevel || level > logLevel )
@@ -305,13 +345,19 @@ namespace mongo {
return Logstream::get().prolog();
}
-#define MONGO_LOG(level) if ( logLevel >= (level) ) log( level )
+#define MONGO_LOG(level) if ( MONGO_unlikely(logLevel >= (level)) ) log( level )
#define LOG MONGO_LOG
inline Nullstream& log( LogLevel l ) {
return Logstream::get().prolog().setLogLevel( l );
}
+ inline Nullstream& log( const LabeledLevel& ll ) {
+ Nullstream& stream = log( ll.getLevel() );
+ if( ll.getLabel() != "" )
+ stream << "[" << ll.getLabel() << "] ";
+ return stream;
+ }
inline Nullstream& log() {
return Logstream::get().prolog();
@@ -392,7 +438,6 @@ namespace mongo {
string errnoWithPrefix( const char * prefix );
void Logstream::logLockless( const StringData& s ) {
-
if ( s.size() == 0 )
return;
@@ -475,4 +520,6 @@ namespace mongo {
}
};
+ extern Tee* const warnings; // Things put here go in serverStatus
+
} // namespace mongo
diff --git a/util/logfile.cpp b/util/logfile.cpp
index 0386a59..609edb8 100644
--- a/util/logfile.cpp
+++ b/util/logfile.cpp
@@ -77,18 +77,35 @@ namespace mongo {
CloseHandle(_fd);
}
- void LogFile::synchronousAppend(const void *buf, size_t len) {
- assert(_fd);
- DWORD written;
- if( !WriteFile(_fd, buf, len, &written, NULL) ) {
- DWORD e = GetLastError();
- if( e == 87 )
- massert(13519, "error appending to file - misaligned direct write?", false);
- else
- uasserted(13517, str::stream() << "error appending to file " << errnoWithDescription(e));
+ void LogFile::truncate() {
+ verify(15870, _fd != INVALID_HANDLE_VALUE);
+
+ if (!SetEndOfFile(_fd)){
+ msgasserted(15871, "Couldn't truncate file: " + errnoWithDescription());
}
- else {
- dassert( written == len );
+ }
+
+ void LogFile::synchronousAppend(const void *_buf, size_t _len) {
+ const size_t BlockSize = 8 * 1024 * 1024;
+ assert(_fd);
+ assert(_len % 4096 == 0);
+ const char *buf = (const char *) _buf;
+ size_t left = _len;
+ while( left ) {
+ size_t toWrite = min(left, BlockSize);
+ DWORD written;
+ if( !WriteFile(_fd, buf, toWrite, &written, NULL) ) {
+ DWORD e = GetLastError();
+ if( e == 87 )
+ msgasserted(13519, "error 87 appending to file - invalid parameter");
+ else
+ uasserted(13517, str::stream() << "error appending to file " << _name << ' ' << _len << ' ' << toWrite << ' ' << errnoWithDescription(e));
+ }
+ else {
+ dassert( written == toWrite );
+ }
+ left -= written;
+ buf += written;
}
}
@@ -96,28 +113,44 @@ namespace mongo {
#else
+// posix
+
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
+#include "paths.h"
namespace mongo {
LogFile::LogFile(string name) : _name(name) {
- _fd = open(name.c_str(),
- O_CREAT
- | O_WRONLY
+ int options = O_CREAT
+ | O_WRONLY
#if defined(O_DIRECT)
- | O_DIRECT
+ | O_DIRECT
#endif
#if defined(O_NOATIME)
- | O_NOATIME
+ | O_NOATIME
+#endif
+ ;
+
+ _fd = open(name.c_str(), options, S_IRUSR | S_IWUSR);
+
+#if defined(O_DIRECT)
+ _direct = true;
+ if( _fd < 0 ) {
+ _direct = false;
+ options &= ~O_DIRECT;
+ _fd = open(name.c_str(), options, S_IRUSR | S_IWUSR);
+ }
+#else
+ _direct = false;
#endif
- ,
- S_IRUSR | S_IWUSR);
+
if( _fd < 0 ) {
uasserted(13516, str::stream() << "couldn't open file " << name << " for writing " << errnoWithDescription());
}
+ flushMyDirectory(name);
}
LogFile::~LogFile() {
@@ -126,7 +159,21 @@ namespace mongo {
_fd = -1;
}
+ void LogFile::truncate() {
+ verify(15872, _fd >= 0);
+
+ BOOST_STATIC_ASSERT(sizeof(off_t) == 8); // we don't want overflow here
+ const off_t pos = lseek(_fd, 0, SEEK_CUR); // doesn't actually seek
+ if (ftruncate(_fd, pos) != 0){
+ msgasserted(15873, "Couldn't truncate file: " + errnoWithDescription());
+ }
+ }
+
void LogFile::synchronousAppend(const void *b, size_t len) {
+#ifdef POSIX_FADV_DONTNEED
+ const off_t pos = lseek(_fd, 0, SEEK_CUR); // doesn't actually seek
+#endif
+
const char *buf = (char *) b;
assert(_fd);
assert(((size_t)buf)%4096==0); // aligned
@@ -150,6 +197,10 @@ namespace mongo {
uasserted(13514, str::stream() << "error appending to file on fsync " << ' ' << errnoWithDescription());
}
+#ifdef POSIX_FADV_DONTNEED
+ if (!_direct)
+ posix_fadvise(_fd, pos, len, POSIX_FADV_DONTNEED);
+#endif
}
}
diff --git a/util/logfile.h b/util/logfile.h
index 9085161..f6d1c94 100644
--- a/util/logfile.h
+++ b/util/logfile.h
@@ -38,6 +38,8 @@ namespace mongo {
const string _name;
+ void truncate(); // Removes extra data after current position
+
private:
#if defined(_WIN32)
typedef HANDLE fd_type;
@@ -45,6 +47,7 @@ namespace mongo {
typedef int fd_type;
#endif
fd_type _fd;
+ bool _direct; // are we using direct I/O
};
}
diff --git a/util/message.cpp b/util/message.cpp
deleted file mode 100644
index bcb1772..0000000
--- a/util/message.cpp
+++ /dev/null
@@ -1,764 +0,0 @@
-/* message
-
- todo: authenticate; encrypt?
-*/
-
-/* Copyright 2009 10gen Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "pch.h"
-#include "message.h"
-#include <time.h>
-#include "../util/goodies.h"
-#include "../util/background.h"
-#include <fcntl.h>
-#include <errno.h>
-#include "../db/cmdline.h"
-#include "../client/dbclient.h"
-#include "../util/time_support.h"
-
-#ifndef _WIN32
-# ifndef __sunos__
-# include <ifaddrs.h>
-# endif
-# include <sys/resource.h>
-# include <sys/stat.h>
-#else
-
-// errno doesn't work for winsock.
-#undef errno
-#define errno WSAGetLastError()
-
-#endif
-
-namespace mongo {
-
- bool noUnixSocket = false;
-
- bool objcheck = false;
-
- void checkTicketNumbers();
-
-// if you want trace output:
-#define mmm(x)
-
-#ifdef MSG_NOSIGNAL
- const int portSendFlags = MSG_NOSIGNAL;
- const int portRecvFlags = MSG_NOSIGNAL;
-#else
- const int portSendFlags = 0;
- const int portRecvFlags = 0;
-#endif
-
- const Listener* Listener::_timeTracker;
-
- string SocketException::toString() const {
- stringstream ss;
- ss << _ei.code << " socket exception [" << _type << "] ";
-
- if ( _server.size() )
- ss << "server [" << _server << "] ";
-
- if ( _extra.size() )
- ss << _extra;
-
- return ss.str();
- }
-
-
- vector<SockAddr> ipToAddrs(const char* ips, int port) {
- vector<SockAddr> out;
- if (*ips == '\0') {
- out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all
-
- if (IPv6Enabled())
- out.push_back(SockAddr("::", port)); // IPv6 all
-#ifndef _WIN32
- if (!noUnixSocket)
- out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket
-#endif
- return out;
- }
-
- while(*ips) {
- string ip;
- const char * comma = strchr(ips, ',');
- if (comma) {
- ip = string(ips, comma - ips);
- ips = comma + 1;
- }
- else {
- ip = string(ips);
- ips = "";
- }
-
- SockAddr sa(ip.c_str(), port);
- out.push_back(sa);
-
-#ifndef _WIN32
- if (!noUnixSocket && (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4
- out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port));
-#endif
- }
- return out;
-
- }
-
- /* listener ------------------------------------------------------------------- */
-
- void Listener::initAndListen() {
- checkTicketNumbers();
- vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port);
- vector<int> socks;
- SOCKET maxfd = 0; // needed for select()
-
- for (vector<SockAddr>::iterator it=mine.begin(), end=mine.end(); it != end; ++it) {
- SockAddr& me = *it;
-
- SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0);
- if ( sock == INVALID_SOCKET ) {
- log() << "ERROR: listen(): invalid socket? " << errnoWithDescription() << endl;
- }
-
- if (me.getType() == AF_UNIX) {
-#if !defined(_WIN32)
- if (unlink(me.getAddr().c_str()) == -1) {
- int x = errno;
- if (x != ENOENT) {
- log() << "couldn't unlink socket file " << me << errnoWithDescription(x) << " skipping" << endl;
- continue;
- }
- }
-#endif
- }
- else if (me.getType() == AF_INET6) {
- // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1)
- // That causes a conflict if we don't do set it to IPV6_ONLY
- const int one = 1;
- setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &one, sizeof(one));
- }
-
- prebindOptions( sock );
-
- if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) {
- int x = errno;
- log() << "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl;
- if ( x == EADDRINUSE )
- log() << " addr already in use" << endl;
- closesocket(sock);
- return;
- }
-
-#if !defined(_WIN32)
- if (me.getType() == AF_UNIX) {
- if (chmod(me.getAddr().c_str(), 0777) == -1) {
- log() << "couldn't chmod socket file " << me << errnoWithDescription() << endl;
- }
-
- ListeningSockets::get()->addPath( me.getAddr() );
- }
-#endif
-
- if ( ::listen(sock, 128) != 0 ) {
- log() << "listen(): listen() failed " << errnoWithDescription() << endl;
- closesocket(sock);
- return;
- }
-
- ListeningSockets::get()->add( sock );
-
- socks.push_back(sock);
- if (sock > maxfd)
- maxfd = sock;
- }
-
- static long connNumber = 0;
- struct timeval maxSelectTime;
- while ( ! inShutdown() ) {
- fd_set fds[1];
- FD_ZERO(fds);
-
- for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) {
- FD_SET(*it, fds);
- }
-
- maxSelectTime.tv_sec = 0;
- maxSelectTime.tv_usec = 10000;
- const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime);
-
- if (ret == 0) {
-#if defined(__linux__)
- _elapsedTime += ( 10000 - maxSelectTime.tv_usec ) / 1000;
-#else
- _elapsedTime += 10;
-#endif
- continue;
- }
- _elapsedTime += ret; // assume 1ms to grab connection. very rough
-
- if (ret < 0) {
- int x = errno;
-#ifdef EINTR
- if ( x == EINTR ) {
- log() << "select() signal caught, continuing" << endl;
- continue;
- }
-#endif
- if ( ! inShutdown() )
- log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl;
- return;
- }
-
- for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) {
- if (! (FD_ISSET(*it, fds)))
- continue;
-
- SockAddr from;
- int s = accept(*it, from.raw(), &from.addressSize);
- if ( s < 0 ) {
- int x = errno; // so no global issues
- if ( x == ECONNABORTED || x == EBADF ) {
- log() << "Listener on port " << _port << " aborted" << endl;
- return;
- }
- if ( x == 0 && inShutdown() ) {
- return; // socket closed
- }
- if( !inShutdown() )
- log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl;
- continue;
- }
- if (from.getType() != AF_UNIX)
- disableNagle(s);
- if ( _logConnect && ! cmdLine.quiet )
- log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl;
- accepted(s, from);
- }
- }
- }
-
- void Listener::accepted(int sock, const SockAddr& from) {
- accepted( new MessagingPort(sock, from) );
- }
-
- /* messagingport -------------------------------------------------------------- */
-
- class PiggyBackData {
- public:
- PiggyBackData( MessagingPort * port ) {
- _port = port;
- _buf = new char[1300];
- _cur = _buf;
- }
-
- ~PiggyBackData() {
- DESTRUCTOR_GUARD (
- flush();
- delete[]( _cur );
- );
- }
-
- void append( Message& m ) {
- assert( m.header()->len <= 1300 );
-
- if ( len() + m.header()->len > 1300 )
- flush();
-
- memcpy( _cur , m.singleData() , m.header()->len );
- _cur += m.header()->len;
- }
-
- void flush() {
- if ( _buf == _cur )
- return;
-
- _port->send( _buf , len(), "flush" );
- _cur = _buf;
- }
-
- int len() const { return _cur - _buf; }
-
- private:
- MessagingPort* _port;
- char * _buf;
- char * _cur;
- };
-
- class Ports {
- set<MessagingPort*> ports;
- mongo::mutex m;
- public:
- Ports() : ports(), m("Ports") {}
- void closeAll(unsigned skip_mask) {
- scoped_lock bl(m);
- for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) {
- if( (*i)->tag & skip_mask )
- continue;
- (*i)->shutdown();
- }
- }
- void insert(MessagingPort* p) {
- scoped_lock bl(m);
- ports.insert(p);
- }
- void erase(MessagingPort* p) {
- scoped_lock bl(m);
- ports.erase(p);
- }
- };
-
- // we "new" this so it is still be around when other automatic global vars
- // are being destructed during termination.
- Ports& ports = *(new Ports());
-
- void MessagingPort::closeAllSockets(unsigned mask) {
- ports.closeAll(mask);
- }
-
- MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), _bytesIn(0), _bytesOut(0), farEnd(_far), _timeout(), tag(0) {
- _logLevel = 0;
- ports.insert(this);
- }
-
- MessagingPort::MessagingPort( double timeout, int ll ) : _bytesIn(0), _bytesOut(0), tag(0) {
- _logLevel = ll;
- ports.insert(this);
- sock = -1;
- piggyBackData = 0;
- _timeout = timeout;
- }
-
- void MessagingPort::shutdown() {
- if ( sock >= 0 ) {
- closesocket(sock);
- sock = -1;
- }
- }
-
- MessagingPort::~MessagingPort() {
- if ( piggyBackData )
- delete( piggyBackData );
- shutdown();
- ports.erase(this);
- }
-
- class ConnectBG : public BackgroundJob {
- public:
- ConnectBG(int sock, SockAddr farEnd) : _sock(sock), _farEnd(farEnd) { }
-
- void run() { _res = ::connect(_sock, _farEnd.raw(), _farEnd.addressSize); }
- string name() const { return "ConnectBG"; }
- int inError() const { return _res; }
-
- private:
- int _sock;
- int _res;
- SockAddr _farEnd;
- };
-
- bool MessagingPort::connect(SockAddr& _far) {
- farEnd = _far;
-
- sock = socket(farEnd.getType(), SOCK_STREAM, 0);
- if ( sock == INVALID_SOCKET ) {
- log(_logLevel) << "ERROR: connect invalid socket " << errnoWithDescription() << endl;
- return false;
- }
-
- if ( _timeout > 0 ) {
- setSockTimeouts( sock, _timeout );
- }
-
- ConnectBG bg(sock, farEnd);
- bg.go();
- if ( bg.wait(5000) ) {
- if ( bg.inError() ) {
- closesocket(sock);
- sock = -1;
- return false;
- }
- }
- else {
- // time out the connect
- closesocket(sock);
- sock = -1;
- bg.wait(); // so bg stays in scope until bg thread terminates
- return false;
- }
-
- if (farEnd.getType() != AF_UNIX)
- disableNagle(sock);
-
-#ifdef SO_NOSIGPIPE
- // osx
- const int one = 1;
- setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int));
-#endif
-
- /*
- // SO_LINGER is bad
- #ifdef SO_LINGER
- struct linger ling;
- ling.l_onoff = 1;
- ling.l_linger = 0;
- setsockopt(sock, SOL_SOCKET, SO_LINGER, (char *) &ling, sizeof(ling));
- #endif
- */
- return true;
- }
-
- bool MessagingPort::recv(Message& m) {
- try {
-again:
- mmm( log() << "* recv() sock:" << this->sock << endl; )
- int len = -1;
-
- char *lenbuf = (char *) &len;
- int lft = 4;
- recv( lenbuf, lft );
-
- if ( len < 16 || len > 48000000 ) { // messages must be large enough for headers
- if ( len == -1 ) {
- // Endian check from the client, after connecting, to see what mode server is running in.
- unsigned foo = 0x10203040;
- send( (char *) &foo, 4, "endian" );
- goto again;
- }
-
- if ( len == 542393671 ) {
- // an http GET
- log(_logLevel) << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl;
- string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n";
- stringstream ss;
- ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
- string s = ss.str();
- send( s.c_str(), s.size(), "http" );
- return false;
- }
- log(0) << "recv(): message len " << len << " is too large" << len << endl;
- return false;
- }
-
- int z = (len+1023)&0xfffffc00;
- assert(z>=len);
- MsgData *md = (MsgData *) malloc(z);
- assert(md);
- md->len = len;
-
- char *p = (char *) &md->id;
- int left = len -4;
-
- try {
- recv( p, left );
- }
- catch (...) {
- free(md);
- throw;
- }
-
- _bytesIn += len;
- m.setData(md, true);
- return true;
-
- }
- catch ( const SocketException & e ) {
- log(_logLevel + (e.shouldPrint() ? 0 : 1) ) << "SocketException: remote: " << remote() << " error: " << e << endl;
- m.reset();
- return false;
- }
- }
-
- void MessagingPort::reply(Message& received, Message& response) {
- say(/*received.from, */response, received.header()->id);
- }
-
- void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) {
- say(/*received.from, */response, responseTo);
- }
-
- bool MessagingPort::call(Message& toSend, Message& response) {
- mmm( log() << "*call()" << endl; )
- say(toSend);
- return recv( toSend , response );
- }
-
- bool MessagingPort::recv( const Message& toSend , Message& response ) {
- while ( 1 ) {
- bool ok = recv(response);
- if ( !ok )
- return false;
- //log() << "got response: " << response.data->responseTo << endl;
- if ( response.header()->responseTo == toSend.header()->id )
- break;
- error() << "MessagingPort::call() wrong id got:" << hex << (unsigned)response.header()->responseTo << " expect:" << (unsigned)toSend.header()->id << '\n'
- << dec
- << " toSend op: " << (unsigned)toSend.operation() << '\n'
- << " response msgid:" << (unsigned)response.header()->id << '\n'
- << " response len: " << (unsigned)response.header()->len << '\n'
- << " response op: " << response.operation() << '\n'
- << " farEnd: " << farEnd << endl;
- assert(false);
- response.reset();
- }
- mmm( log() << "*call() end" << endl; )
- return true;
- }
-
- void MessagingPort::say(Message& toSend, int responseTo) {
- assert( !toSend.empty() );
- mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
- toSend.header()->id = nextMessageId();
- toSend.header()->responseTo = responseTo;
-
- if ( piggyBackData && piggyBackData->len() ) {
- mmm( log() << "* have piggy back" << endl; )
- if ( ( piggyBackData->len() + toSend.header()->len ) > 1300 ) {
- // won't fit in a packet - so just send it off
- piggyBackData->flush();
- }
- else {
- piggyBackData->append( toSend );
- piggyBackData->flush();
- return;
- }
- }
-
- toSend.send( *this, "say" );
- }
-
- // sends all data or throws an exception
- void MessagingPort::send( const char * data , int len, const char *context ) {
- _bytesOut += len;
- while( len > 0 ) {
- int ret = ::send( sock , data , len , portSendFlags );
- if ( ret == -1 ) {
- if ( errno != EAGAIN || _timeout == 0 ) {
- SocketException::Type t = SocketException::SEND_ERROR;
-#if defined(_WINDOWS)
- if( e == WSAETIMEDOUT ) t = SocketException::SEND_TIMEOUT;
-#endif
- log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl;
- throw SocketException( t );
- }
- else {
- if ( !serverAlive( farEnd.toString() ) ) {
- log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl;
- throw SocketException( SocketException::SEND_ERROR );
- }
- }
- }
- else {
- assert( ret <= len );
- len -= ret;
- data += ret;
- }
- }
- }
-
- // sends all data or throws an exception
- void MessagingPort::send( const vector< pair< char *, int > > &data, const char *context ) {
-#if defined(_WIN32)
- // TODO use scatter/gather api
- for( vector< pair< char *, int > >::const_iterator i = data.begin(); i != data.end(); ++i ) {
- char * data = i->first;
- int len = i->second;
- send( data, len, context );
- }
-#else
- vector< struct iovec > d( data.size() );
- int i = 0;
- for( vector< pair< char *, int > >::const_iterator j = data.begin(); j != data.end(); ++j ) {
- if ( j->second > 0 ) {
- d[ i ].iov_base = j->first;
- d[ i ].iov_len = j->second;
- ++i;
- }
- }
- struct msghdr meta;
- memset( &meta, 0, sizeof( meta ) );
- meta.msg_iov = &d[ 0 ];
- meta.msg_iovlen = d.size();
-
- while( meta.msg_iovlen > 0 ) {
- int ret = ::sendmsg( sock , &meta , portSendFlags );
- if ( ret == -1 ) {
- if ( errno != EAGAIN || _timeout == 0 ) {
- log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl;
- throw SocketException( SocketException::SEND_ERROR );
- }
- else {
- if ( !serverAlive( farEnd.toString() ) ) {
- log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl;
- throw SocketException( SocketException::SEND_ERROR );
- }
- }
- }
- else {
- struct iovec *& i = meta.msg_iov;
- while( ret > 0 ) {
- if ( i->iov_len > unsigned( ret ) ) {
- i->iov_len -= ret;
- i->iov_base = (char*)(i->iov_base) + ret;
- ret = 0;
- }
- else {
- ret -= i->iov_len;
- ++i;
- --(meta.msg_iovlen);
- }
- }
- }
- }
-#endif
- }
-
- void MessagingPort::recv( char * buf , int len ) {
- unsigned retries = 0;
- while( len > 0 ) {
- int ret = ::recv( sock , buf , len , portRecvFlags );
- if ( ret > 0 ) {
- if ( len <= 4 && ret != len )
- log(_logLevel) << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl;
- assert( ret <= len );
- len -= ret;
- buf += ret;
- }
- else if ( ret == 0 ) {
- log(3) << "MessagingPort recv() conn closed? " << farEnd.toString() << endl;
- throw SocketException( SocketException::CLOSED );
- }
- else { /* ret < 0 */
- int e = errno;
-
-#if defined(EINTR) && !defined(_WIN32)
- if( e == EINTR ) {
- if( ++retries == 1 ) {
- log() << "EINTR retry" << endl;
- continue;
- }
- }
-#endif
- if ( ( e == EAGAIN
-#ifdef _WINDOWS
- || e == WSAETIMEDOUT
-#endif
- ) && _timeout > 0 ) {
- // this is a timeout
- log(_logLevel) << "MessagingPort recv() timeout " << farEnd.toString() <<endl;
- throw SocketException(SocketException::RECV_TIMEOUT);
- }
-
- log(_logLevel) << "MessagingPort recv() " << errnoWithDescription(e) << " " << farEnd.toString() <<endl;
- throw SocketException(SocketException::RECV_ERROR);
- }
- }
- }
-
- int MessagingPort::unsafe_recv( char *buf, int max ) {
- return ::recv( sock , buf , max , portRecvFlags );
- }
-
- void MessagingPort::piggyBack( Message& toSend , int responseTo ) {
-
- if ( toSend.header()->len > 1300 ) {
- // not worth saving because its almost an entire packet
- say( toSend );
- return;
- }
-
- // we're going to be storing this, so need to set it up
- toSend.header()->id = nextMessageId();
- toSend.header()->responseTo = responseTo;
-
- if ( ! piggyBackData )
- piggyBackData = new PiggyBackData( this );
-
- piggyBackData->append( toSend );
- }
-
- unsigned MessagingPort::remotePort() const {
- return farEnd.getPort();
- }
-
- HostAndPort MessagingPort::remote() const {
- if ( _farEndParsed.port() == -1 )
- _farEndParsed = HostAndPort( farEnd );
- return _farEndParsed;
- }
-
-
- MSGID NextMsgId;
-
- struct MsgStart {
- MsgStart() {
- NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
- assert(MsgDataHeaderSize == 16);
- }
- } msgstart;
-
- MSGID nextMessageId() {
- MSGID msgid = NextMsgId++;
- return msgid;
- }
-
- bool doesOpGetAResponse( int op ) {
- return op == dbQuery || op == dbGetMore;
- }
-
- const int DEFAULT_MAX_CONN = 20000;
- const int MAX_MAX_CONN = 20000;
-
- int getMaxConnections() {
-#ifdef _WIN32
- return DEFAULT_MAX_CONN;
-#else
- struct rlimit limit;
- assert( getrlimit(RLIMIT_NOFILE,&limit) == 0 );
-
- int max = (int)(limit.rlim_cur * .8);
-
- log(1) << "fd limit"
- << " hard:" << limit.rlim_max
- << " soft:" << limit.rlim_cur
- << " max conn: " << max
- << endl;
-
- if ( max > MAX_MAX_CONN )
- max = MAX_MAX_CONN;
-
- return max;
-#endif
- }
-
- void checkTicketNumbers() {
- int want = getMaxConnections();
- int current = connTicketHolder.outof();
- if ( current != DEFAULT_MAX_CONN ) {
- if ( current < want ) {
- // they want fewer than they can handle
- // which is fine
- log(1) << " only allowing " << current << " connections" << endl;
- return;
- }
- if ( current > want ) {
- log() << " --maxConns too high, can only handle " << want << endl;
- }
- }
- connTicketHolder.resize( want );
- }
-
- TicketHolder connTicketHolder(DEFAULT_MAX_CONN);
-
-} // namespace mongo
diff --git a/util/mmap.cpp b/util/mmap.cpp
index 18edc34..fa9ab73 100644
--- a/util/mmap.cpp
+++ b/util/mmap.cpp
@@ -20,6 +20,7 @@
#include "processinfo.h"
#include "concurrency/rwlock.h"
#include "../db/namespace.h"
+#include "../db/cmdline.h"
namespace mongo {
@@ -61,7 +62,7 @@ namespace mongo {
this is the administrative stuff
*/
- RWLock MongoFile::mmmutex("rw:mmmutex");
+ RWLockRecursive MongoFile::mmmutex("mmmutex",10*60*1000 /* 10 minutes */);
/* subclass must call in destructor (or at close).
removes this from pathToFile and other maps
@@ -69,7 +70,7 @@ namespace mongo {
ideal to call close to the close, if the close is well before object destruction
*/
void MongoFile::destroyed() {
- rwlock lk( mmmutex , true );
+ mmmutex.assertExclusivelyLocked();
mmfiles.erase(this);
pathToFile.erase( filename() );
}
@@ -83,11 +84,12 @@ namespace mongo {
}
++closingAllFiles;
- rwlock lk( mmmutex , true );
+ RWLockRecursive::Exclusive lk(mmmutex);
ProgressMeter pm( mmfiles.size() , 2 , 1 );
- for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) {
- (*i)->close();
+ set<MongoFile*> temp = mmfiles;
+ for ( set<MongoFile*>::iterator i = temp.begin(); i != temp.end(); i++ ) {
+ (*i)->close(); // close() now removes from mmfiles
pm.hit();
}
message << "closeAllFiles() finished";
@@ -97,7 +99,7 @@ namespace mongo {
/*static*/ long long MongoFile::totalMappedLength() {
unsigned long long total = 0;
- rwlock lk( mmmutex , false );
+ RWLockRecursive::Shared lk(mmmutex);
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ )
total += (*i)->length();
@@ -120,7 +122,7 @@ namespace mongo {
/*static*/ int MongoFile::_flushAll( bool sync ) {
if ( ! sync ) {
int num = 0;
- rwlock lk( mmmutex , false );
+ RWLockRecursive::Shared lk(mmmutex);
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) {
num++;
MongoFile * mmf = *i;
@@ -137,7 +139,7 @@ namespace mongo {
while ( true ) {
auto_ptr<Flushable> f;
{
- rwlock lk( mmmutex , false );
+ RWLockRecursive::Shared lk(mmmutex);
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) {
MongoFile * mmf = *i;
if ( ! mmf )
@@ -158,12 +160,12 @@ namespace mongo {
}
void MongoFile::created() {
- rwlock lk( mmmutex , true );
+ RWLockRecursive::Exclusive lk(mmmutex);
mmfiles.insert(this);
}
void MongoFile::setFilename(string fn) {
- rwlock( mmmutex, true );
+ RWLockRecursive::Exclusive lk(mmmutex);
assert( _filename.empty() );
_filename = fn;
MongoFile *&ptf = pathToFile[fn];
@@ -173,7 +175,9 @@ namespace mongo {
#if defined(_DEBUG)
void MongoFile::markAllWritable() {
- rwlock lk( mmmutex , false );
+ if( cmdLine.dur )
+ return;
+ RWLockRecursive::Shared lk(mmmutex);
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) {
MongoFile * mmf = *i;
if (mmf) mmf->_lock();
@@ -181,7 +185,9 @@ namespace mongo {
}
void MongoFile::unmarkAllWritable() {
- rwlock lk( mmmutex , false );
+ if( cmdLine.dur )
+ return;
+ RWLockRecursive::Shared lk(mmmutex);
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) {
MongoFile * mmf = *i;
if (mmf) mmf->_unlock();
diff --git a/util/mmap.h b/util/mmap.h
index 2ef4176..c1b14bb 100644
--- a/util/mmap.h
+++ b/util/mmap.h
@@ -21,6 +21,15 @@
namespace mongo {
+ class MAdvise {
+ void *_p;
+ unsigned _len;
+ public:
+ enum Advice { Sequential=1 };
+ MAdvise(void *p, unsigned len, Advice a);
+ ~MAdvise(); // destructor resets the range to MADV_NORMAL
+ };
+
/* the administrative-ish stuff here */
class MongoFile : boost::noncopyable {
public:
@@ -44,7 +53,8 @@ namespace mongo {
template < class F >
static void forEach( F fun );
- /** note: you need to be in mmmutex when using this. forEach (above) handles that for you automatically. */
+ /** note: you need to be in mmmutex when using this. forEach (above) handles that for you automatically.
+*/
static set<MongoFile*>& getAllFiles() { return mmfiles; }
// callbacks if you need them
@@ -100,7 +110,9 @@ namespace mongo {
static set<MongoFile*> mmfiles;
public:
static map<string,MongoFile*> pathToFile;
- static RWLock mmmutex;
+
+ // lock order: lock dbMutex before this if you lock both
+ static RWLockRecursive mmmutex;
};
/** look up a MMF by filename. scoped mutex locking convention.
@@ -111,7 +123,7 @@ namespace mongo {
*/
class MongoFileFinder : boost::noncopyable {
public:
- MongoFileFinder() : _lk(MongoFile::mmmutex,false) { }
+ MongoFileFinder() : _lk(MongoFile::mmmutex) { }
/** @return The MongoFile object associated with the specified file name. If no file is open
with the specified name, returns null.
@@ -122,7 +134,7 @@ namespace mongo {
}
private:
- rwlock _lk;
+ RWLockRecursive::Shared _lk;
};
struct MongoFileAllowWrites {
@@ -135,11 +147,18 @@ namespace mongo {
};
class MemoryMappedFile : public MongoFile {
+ protected:
+ virtual void* viewForFlushing() {
+ if( views.size() == 0 )
+ return 0;
+ assert( views.size() == 1 );
+ return views[0];
+ }
public:
MemoryMappedFile();
virtual ~MemoryMappedFile() {
- destroyed(); // cleans up from the master list of mmaps
+ RWLockRecursive::Exclusive lk(mmmutex);
close();
}
@@ -147,6 +166,8 @@ namespace mongo {
// Throws exception if file doesn't exist. (dm may2010: not sure if this is always true?)
void* map(const char *filename);
+
+ /** @param options see MongoFile::Options */
void* mapWithOptions(const char *filename, int options);
/* Creates with length if DNE, otherwise uses existing file length,
@@ -187,7 +208,7 @@ namespace mongo {
HANDLE maphandle;
vector<void *> views;
unsigned long long len;
-
+
#ifdef _WIN32
boost::shared_ptr<mutex> _flushMutex;
void clearWritableBits(void *privateView);
@@ -212,7 +233,7 @@ namespace mongo {
/** p is called from within a mutex that MongoFile uses. so be careful not to deadlock. */
template < class F >
inline void MongoFile::forEach( F p ) {
- rwlock lk( mmmutex , false );
+ RWLockRecursive::Shared lklk(mmmutex);
for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ )
p(*i);
}
@@ -227,10 +248,11 @@ namespace mongo {
bool get(unsigned i) const {
unsigned x = i / 32;
assert( x < MemoryMappedFile::NChunks );
- return bits[x] & (1 << (i%32));
+ return (bits[x] & (1 << (i%32))) != 0;
}
void set(unsigned i) {
unsigned x = i / 32;
+ wassert( x < (MemoryMappedFile::NChunks*2/3) ); // warn if getting close to limit
assert( x < MemoryMappedFile::NChunks );
bits[x] |= (1 << (i%32));
}
diff --git a/util/mmap_posix.cpp b/util/mmap_posix.cpp
index f47a06f..5b5e86d 100644
--- a/util/mmap_posix.cpp
+++ b/util/mmap_posix.cpp
@@ -19,13 +19,12 @@
#include "mmap.h"
#include "file_allocator.h"
#include "../db/concurrency.h"
-
#include <errno.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
-
+#include "../util/processinfo.h"
#include "mongoutils/str.h"
using namespace mongoutils;
@@ -39,6 +38,7 @@ namespace mongo {
}
void MemoryMappedFile::close() {
+ mmmutex.assertExclusivelyLocked();
for( vector<void*>::iterator i = views.begin(); i != views.end(); i++ ) {
munmap(*i,len);
}
@@ -47,6 +47,7 @@ namespace mongo {
if ( fd )
::close(fd);
fd = 0;
+ destroyed(); // cleans up from the master list of mmaps
}
#ifndef O_NOATIME
@@ -57,6 +58,19 @@ namespace mongo {
#define MAP_NORESERVE (0)
#endif
+#if defined(__sunos__)
+ MAdvise::MAdvise(void *,unsigned, Advice) { }
+ MAdvise::~MAdvise() { }
+#else
+ MAdvise::MAdvise(void *p, unsigned len, Advice a) : _p(p), _len(len) {
+ assert( a == Sequential ); // more later
+ madvise(_p,_len,MADV_SEQUENTIAL);
+ }
+ MAdvise::~MAdvise() {
+ madvise(_p,_len,MADV_NORMAL);
+ }
+#endif
+
void* MemoryMappedFile::map(const char *filename, unsigned long long &length, int options) {
// length may be updated by callee.
setFilename(filename);
@@ -68,6 +82,7 @@ namespace mongo {
fd = open(filename, O_RDWR | O_NOATIME);
if ( fd <= 0 ) {
log() << "couldn't open " << filename << ' ' << errnoWithDescription() << endl;
+ fd = 0; // our sentinel for not opened
return 0;
}
@@ -149,6 +164,7 @@ namespace mongo {
int err = errno;
error() << "13601 Couldn't remap private view: " << errnoWithDescription(err) << endl;
log() << "aborting" << endl;
+ printMemInfo();
abort();
}
assert( x == oldPrivateAddr );
@@ -158,7 +174,7 @@ namespace mongo {
void MemoryMappedFile::flush(bool sync) {
if ( views.empty() || fd == 0 )
return;
- if ( msync(views[0], len, sync ? MS_SYNC : MS_ASYNC) )
+ if ( msync(viewForFlushing(), len, sync ? MS_SYNC : MS_ASYNC) )
problem() << "msync " << errnoWithDescription() << endl;
}
@@ -181,7 +197,7 @@ namespace mongo {
};
MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush() {
- return new PosixFlushable( views.empty() ? 0 : views[0] , fd , len );
+ return new PosixFlushable( viewForFlushing() , fd , len );
}
void MemoryMappedFile::_lock() {
diff --git a/util/mmap_win.cpp b/util/mmap_win.cpp
index 0b0b834..9173d7b 100644
--- a/util/mmap_win.cpp
+++ b/util/mmap_win.cpp
@@ -18,7 +18,6 @@
#include "pch.h"
#include "mmap.h"
#include "text.h"
-#include <windows.h>
#include "../db/mongommf.h"
#include "../db/concurrency.h"
@@ -27,6 +26,9 @@ namespace mongo {
mutex mapViewMutex("mapView");
ourbitset writable;
+ MAdvise::MAdvise(void *,unsigned, Advice) { }
+ MAdvise::~MAdvise() { }
+
/** notification on unmapping so we can clear writable bits */
void MemoryMappedFile::clearWritableBits(void *p) {
for( unsigned i = ((size_t)p)/ChunkSize; i <= (((size_t)p)+len)/ChunkSize; i++ ) {
@@ -44,6 +46,7 @@ namespace mongo {
}
void MemoryMappedFile::close() {
+ mmmutex.assertExclusivelyLocked();
for( vector<void*>::iterator i = views.begin(); i != views.end(); i++ ) {
clearWritableBits(*i);
UnmapViewOfFile(*i);
@@ -55,6 +58,7 @@ namespace mongo {
if ( fd )
CloseHandle(fd);
fd = 0;
+ destroyed(); // cleans up from the master list of mmaps
}
unsigned long long mapped = 0;
@@ -138,7 +142,8 @@ namespace mongo {
}
if ( view == 0 ) {
DWORD e = GetLastError();
- log() << "MapViewOfFile failed " << filename << " " << errnoWithDescription(e) << endl;
+ log() << "MapViewOfFile failed " << filename << " " << errnoWithDescription(e) <<
+ ((sizeof(void*)==4)?" (32 bit build)":"") << endl;
close();
}
else {
@@ -183,13 +188,13 @@ namespace mongo {
void MemoryMappedFile::flush(bool sync) {
uassert(13056, "Async flushing not supported on windows", sync);
if( !views.empty() ) {
- WindowsFlushable f( views[0] , fd , filename() , _flushMutex);
+ WindowsFlushable f( viewForFlushing() , fd , filename() , _flushMutex);
f.flush();
}
}
MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush() {
- return new WindowsFlushable( views.empty() ? 0 : views[0] , fd , filename() , _flushMutex );
+ return new WindowsFlushable( viewForFlushing() , fd , filename() , _flushMutex );
}
void MemoryMappedFile::_lock() {}
void MemoryMappedFile::_unlock() {}
diff --git a/util/mongoutils/README b/util/mongoutils/README
index fd2a589..f61277c 100755
--- a/util/mongoutils/README
+++ b/util/mongoutils/README
@@ -11,3 +11,5 @@
So basically, easy to use, general purpose stuff, with no arduous dependencies to drop into
any new project.
+
+ *** PLACE UNIT TESTS IN mongoutils/test.cpp ***
diff --git a/util/mongoutils/str.h b/util/mongoutils/str.h
index ea8f938..57b94fa 100644
--- a/util/mongoutils/str.h
+++ b/util/mongoutils/str.h
@@ -29,6 +29,8 @@
#include <string>
#include <sstream>
+
+// this violates the README rules for mongoutils:
#include "../../bson/util/builder.h"
namespace mongoutils {
@@ -48,13 +50,11 @@ namespace mongoutils {
class stream {
public:
mongo::StringBuilder ss;
-
template<class T>
stream& operator<<(const T& v) {
ss << v;
return *this;
}
-
operator std::string () const { return ss.str(); }
};
@@ -106,13 +106,13 @@ namespace mongoutils {
return strchr(s.c_str(), x) != 0;
}
- /** @return everything befor the character x, else entire string */
+ /** @return everything before the character x, else entire string */
inline string before(const string& s, char x) {
const char *p = strchr(s.c_str(), x);
return (p != 0) ? s.substr(0, p-s.c_str()) : s;
}
- /** @return everything befor the string x, else entire string */
+ /** @return everything before the string x, else entire string */
inline string before(const string& s, const string& x) {
const char *p = strstr(s.c_str(), x.c_str());
return (p != 0) ? s.substr(0, p-s.c_str()) : s;
diff --git a/util/mongoutils/test.cpp b/util/mongoutils/test.cpp
index d8ee461..45268c5 100644
--- a/util/mongoutils/test.cpp
+++ b/util/mongoutils/test.cpp
@@ -19,9 +19,9 @@
* limitations under the License.
*/
+#include <assert.h>
#include "str.h"
#include "html.h"
-#include <assert.h>
using namespace std;
using namespace mongoutils;
diff --git a/util/hostandport.h b/util/net/hostandport.h
index fd27296..573e8ee 100644
--- a/util/hostandport.h
+++ b/util/net/hostandport.h
@@ -18,8 +18,8 @@
#pragma once
#include "sock.h"
-#include "../db/cmdline.h"
-#include "mongoutils/str.h"
+#include "../../db/cmdline.h"
+#include "../mongoutils/str.h"
namespace mongo {
@@ -70,8 +70,10 @@ namespace mongo {
bool isLocalHost() const;
- // @returns host:port
- string toString() const;
+ /**
+ * @param includePort host:port if true, host otherwise
+ */
+ string toString( bool includePort=true ) const;
operator string() const { return toString(); }
@@ -87,24 +89,6 @@ namespace mongo {
int _port; // -1 indicates unspecified
};
- /** returns true if strings seem to be the same hostname.
- "nyc1", "nyc1.acme", and "nyc1.acme.com" are treated as the same.
- */
- inline bool sameHostname(const string& a, const string& b) {
- size_t prefixLen = str::shareCommonPrefix(a.c_str(), b.c_str());
-
- if (prefixLen == a.size()) { // (a == b) or (a isPrefixOf b)
- if ( b[prefixLen] == '.' || b[prefixLen] == '\0')
- return true;
- }
- else if(prefixLen == b.size()) { // (b isPrefixOf a)
- if ( a[prefixLen] == '.') // can't be '\0'
- return true;
- }
-
- return false;
- }
-
inline HostAndPort HostAndPort::Me() {
const char* ips = cmdLine.bind_ip.c_str();
while(*ips) {
@@ -130,7 +114,10 @@ namespace mongo {
return HostAndPort(h, cmdLine.port);
}
- inline string HostAndPort::toString() const {
+ inline string HostAndPort::toString( bool includePort ) const {
+ if ( ! includePort )
+ return _host;
+
stringstream ss;
ss << _host;
if ( _port != -1 ) {
@@ -150,7 +137,12 @@ namespace mongo {
}
inline bool HostAndPort::isLocalHost() const {
- return _host == "localhost" || startsWith(_host.c_str(), "127.") || _host == "::1";
+ return ( _host == "localhost"
+ || startsWith(_host.c_str(), "127.")
+ || _host == "::1"
+ || _host == "anonymous unix socket"
+ || _host.c_str()[0] == '/' // unix socket
+ );
}
inline HostAndPort::HostAndPort(string s) {
diff --git a/util/httpclient.cpp b/util/net/httpclient.cpp
index 61d5671..16eaa0a 100644
--- a/util/httpclient.cpp
+++ b/util/net/httpclient.cpp
@@ -19,7 +19,9 @@
#include "httpclient.h"
#include "sock.h"
#include "message.h"
-#include "../bson/util/builder.h"
+#include "message_port.h"
+#include "../mongoutils/str.h"
+#include "../../bson/util/builder.h"
namespace mongo {
@@ -36,8 +38,15 @@ namespace mongo {
}
int HttpClient::_go( const char * command , string url , const char * body , Result * result ) {
- uassert( 10271 , "invalid url" , url.find( "http://" ) == 0 );
- url = url.substr( 7 );
+ bool ssl = false;
+ if ( url.find( "https://" ) == 0 ) {
+ ssl = true;
+ url = url.substr( 8 );
+ }
+ else {
+ uassert( 10271 , "invalid url" , url.find( "http://" ) == 0 );
+ url = url.substr( 7 );
+ }
string host , path;
if ( url.find( "/" ) == string::npos ) {
@@ -54,7 +63,7 @@ namespace mongo {
HD( "path [" << path << "]" );
string server = host;
- int port = 80;
+ int port = ssl ? 443 : 80;
string::size_type idx = host.find( ":" );
if ( idx != string::npos ) {
@@ -87,18 +96,27 @@ namespace mongo {
SockAddr addr( server.c_str() , port );
HD( "addr: " << addr.toString() );
- MessagingPort p;
- if ( ! p.connect( addr ) )
+ Socket sock;
+ if ( ! sock.connect( addr ) )
return -1;
+
+ if ( ssl ) {
+#ifdef MONGO_SSL
+ _checkSSLManager();
+ sock.secure( _sslManager.get() );
+#else
+ uasserted( 15862 , "no ssl support" );
+#endif
+ }
{
const char * out = req.c_str();
int toSend = req.size();
- p.send( out , toSend, "_go" );
+ sock.send( out , toSend, "_go" );
}
char buf[4096];
- int got = p.unsafe_recv( buf , 4096 );
+ int got = sock.unsafe_recv( buf , 4096 );
buf[got] = 0;
int rc;
@@ -110,7 +128,7 @@ namespace mongo {
if ( result )
sb << buf;
- while ( ( got = p.unsafe_recv( buf , 4096 ) ) > 0) {
+ while ( ( got = sock.unsafe_recv( buf , 4096 ) ) > 0) {
if ( result )
sb << buf;
}
@@ -141,10 +159,19 @@ namespace mongo {
if ( h.size() == 0 )
break;
+
+ i = h.find( ':' );
+ if ( i != string::npos )
+ _headers[h.substr(0,i)] = str::ltrim(h.substr(i+1));
}
_body = entire;
}
+#ifdef MONGO_SSL
+ void HttpClient::_checkSSLManager() {
+ _sslManager.reset( new SSLManager( true ) );
+ }
+#endif
}
diff --git a/util/httpclient.h b/util/net/httpclient.h
index d66544e..c3f8c82 100644
--- a/util/httpclient.h
+++ b/util/net/httpclient.h
@@ -17,13 +17,16 @@
#pragma once
-#include "../pch.h"
+#include "../../pch.h"
+#include "sock.h"
namespace mongo {
- class HttpClient {
+ class HttpClient : boost::noncopyable {
public:
+ typedef map<string,string> Headers;
+
class Result {
public:
Result() {}
@@ -32,7 +35,7 @@ namespace mongo {
return _entireResponse;
}
- const map<string,string> getHeaders() const {
+ const Headers getHeaders() const {
return _headers;
}
@@ -47,7 +50,7 @@ namespace mongo {
int _code;
string _entireResponse;
- map<string,string> _headers;
+ Headers _headers;
string _body;
friend class HttpClient;
@@ -66,6 +69,11 @@ namespace mongo {
private:
int _go( const char * command , string url , const char * body , Result * result );
+#ifdef MONGO_SSL
+ void _checkSSLManager();
+
+ scoped_ptr<SSLManager> _sslManager;
+#endif
};
}
diff --git a/util/net/listen.cpp b/util/net/listen.cpp
new file mode 100644
index 0000000..6ee25b4
--- /dev/null
+++ b/util/net/listen.cpp
@@ -0,0 +1,391 @@
+// listen.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "pch.h"
+#include "listen.h"
+#include "message_port.h"
+
+#ifndef _WIN32
+
+# ifndef __sunos__
+# include <ifaddrs.h>
+# endif
+# include <sys/resource.h>
+# include <sys/stat.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <netdb.h>
+#ifdef __openbsd__
+# include <sys/uio.h>
+#endif
+
+#else
+
+// errno doesn't work for winsock.
+#undef errno
+#define errno WSAGetLastError()
+
+#endif
+
+namespace mongo {
+
+
+ void checkTicketNumbers();
+
+
+ // ----- Listener -------
+
+ const Listener* Listener::_timeTracker;
+
+ vector<SockAddr> ipToAddrs(const char* ips, int port, bool useUnixSockets) {
+ vector<SockAddr> out;
+ if (*ips == '\0') {
+ out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all
+
+ if (IPv6Enabled())
+ out.push_back(SockAddr("::", port)); // IPv6 all
+#ifndef _WIN32
+ if (useUnixSockets)
+ out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket
+#endif
+ return out;
+ }
+
+ while(*ips) {
+ string ip;
+ const char * comma = strchr(ips, ',');
+ if (comma) {
+ ip = string(ips, comma - ips);
+ ips = comma + 1;
+ }
+ else {
+ ip = string(ips);
+ ips = "";
+ }
+
+ SockAddr sa(ip.c_str(), port);
+ out.push_back(sa);
+
+#ifndef _WIN32
+ if (useUnixSockets && (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4
+ out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port));
+#endif
+ }
+ return out;
+
+ }
+
+ Listener::Listener(const string& name, const string &ip, int port, bool logConnect )
+ : _port(port), _name(name), _ip(ip), _logConnect(logConnect), _elapsedTime(0) {
+#ifdef MONGO_SSL
+ _ssl = 0;
+ _sslPort = 0;
+
+ if ( cmdLine.sslOnNormalPorts && cmdLine.sslServerManager ) {
+ secure( cmdLine.sslServerManager );
+ }
+#endif
+ }
+
+ Listener::~Listener() {
+ if ( _timeTracker == this )
+ _timeTracker = 0;
+ }
+
+#ifdef MONGO_SSL
+ void Listener::secure( SSLManager* manager ) {
+ _ssl = manager;
+ }
+
+ void Listener::addSecurePort( SSLManager* manager , int additionalPort ) {
+ _ssl = manager;
+ _sslPort = additionalPort;
+ }
+
+#endif
+
+ bool Listener::_setupSockets( const vector<SockAddr>& mine , vector<int>& socks ) {
+ for (vector<SockAddr>::const_iterator it=mine.begin(), end=mine.end(); it != end; ++it) {
+ const SockAddr& me = *it;
+
+ SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0);
+ massert( 15863 , str::stream() << "listen(): invalid socket? " << errnoWithDescription() , sock >= 0 );
+
+ if (me.getType() == AF_UNIX) {
+#if !defined(_WIN32)
+ if (unlink(me.getAddr().c_str()) == -1) {
+ int x = errno;
+ if (x != ENOENT) {
+ log() << "couldn't unlink socket file " << me << errnoWithDescription(x) << " skipping" << endl;
+ continue;
+ }
+ }
+#endif
+ }
+ else if (me.getType() == AF_INET6) {
+ // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1)
+ // That causes a conflict if we don't do set it to IPV6_ONLY
+ const int one = 1;
+ setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &one, sizeof(one));
+ }
+
+#if !defined(_WIN32)
+ {
+ const int one = 1;
+ if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0 )
+ out() << "Failed to set socket opt, SO_REUSEADDR" << endl;
+ }
+#endif
+
+ if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) {
+ int x = errno;
+ error() << "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl;
+ if ( x == EADDRINUSE )
+ error() << " addr already in use" << endl;
+ closesocket(sock);
+ return false;
+ }
+
+#if !defined(_WIN32)
+ if (me.getType() == AF_UNIX) {
+ if (chmod(me.getAddr().c_str(), 0777) == -1) {
+ error() << "couldn't chmod socket file " << me << errnoWithDescription() << endl;
+ }
+ ListeningSockets::get()->addPath( me.getAddr() );
+ }
+#endif
+
+ if ( ::listen(sock, 128) != 0 ) {
+ error() << "listen(): listen() failed " << errnoWithDescription() << endl;
+ closesocket(sock);
+ return false;
+ }
+
+ ListeningSockets::get()->add( sock );
+
+ socks.push_back(sock);
+ }
+
+ return true;
+ }
+
+ void Listener::initAndListen() {
+ checkTicketNumbers();
+ vector<int> socks;
+ set<int> sslSocks;
+
+ { // normal sockets
+ vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port, (!cmdLine.noUnixSocket && useUnixSockets()));
+ if ( ! _setupSockets( mine , socks ) )
+ return;
+ }
+
+#ifdef MONGO_SSL
+ if ( _ssl && _sslPort > 0 ) {
+ unsigned prev = socks.size();
+
+ vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _sslPort, false );
+ if ( ! _setupSockets( mine , socks ) )
+ return;
+
+ for ( unsigned i=prev; i<socks.size(); i++ ) {
+ sslSocks.insert( socks[i] );
+ }
+
+ }
+#endif
+
+ SOCKET maxfd = 0; // needed for select()
+ for ( unsigned i=0; i<socks.size(); i++ ) {
+ if ( socks[i] > maxfd )
+ maxfd = socks[i];
+ }
+
+#ifdef MONGO_SSL
+ if ( _ssl == 0 ) {
+ _logListen( _port , false );
+ }
+ else if ( _sslPort == 0 ) {
+ _logListen( _port , true );
+ }
+ else {
+ // both
+ _logListen( _port , false );
+ _logListen( _sslPort , true );
+ }
+#else
+ _logListen( _port , false );
+#endif
+
+ static long connNumber = 0;
+ struct timeval maxSelectTime;
+ while ( ! inShutdown() ) {
+ fd_set fds[1];
+ FD_ZERO(fds);
+
+ for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) {
+ FD_SET(*it, fds);
+ }
+
+ maxSelectTime.tv_sec = 0;
+ maxSelectTime.tv_usec = 10000;
+ const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime);
+
+ if (ret == 0) {
+#if defined(__linux__)
+ _elapsedTime += ( 10000 - maxSelectTime.tv_usec ) / 1000;
+#else
+ _elapsedTime += 10;
+#endif
+ continue;
+ }
+
+ if (ret < 0) {
+ int x = errno;
+#ifdef EINTR
+ if ( x == EINTR ) {
+ log() << "select() signal caught, continuing" << endl;
+ continue;
+ }
+#endif
+ if ( ! inShutdown() )
+ log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl;
+ return;
+ }
+
+#if defined(__linux__)
+ _elapsedTime += max(ret, (int)(( 10000 - maxSelectTime.tv_usec ) / 1000));
+#else
+ _elapsedTime += ret; // assume 1ms to grab connection. very rough
+#endif
+
+ for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) {
+ if (! (FD_ISSET(*it, fds)))
+ continue;
+
+ SockAddr from;
+ int s = accept(*it, from.raw(), &from.addressSize);
+ if ( s < 0 ) {
+ int x = errno; // so no global issues
+ if ( x == ECONNABORTED || x == EBADF ) {
+ log() << "Listener on port " << _port << " aborted" << endl;
+ return;
+ }
+ if ( x == 0 && inShutdown() ) {
+ return; // socket closed
+ }
+ if( !inShutdown() ) {
+ log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl;
+ if (x == EMFILE || x == ENFILE) {
+ // Connection still in listen queue but we can't accept it yet
+ error() << "Out of file descriptors. Waiting one second before trying to accept more connections." << warnings;
+ sleepsecs(1);
+ }
+ }
+ continue;
+ }
+ if (from.getType() != AF_UNIX)
+ disableNagle(s);
+ if ( _logConnect && ! cmdLine.quiet )
+ log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl;
+
+ Socket newSock = Socket(s, from);
+#ifdef MONGO_SSL
+ if ( _ssl && ( _sslPort == 0 || sslSocks.count(*it) ) ) {
+ newSock.secureAccepted( _ssl );
+ }
+#endif
+ accepted( newSock );
+ }
+ }
+ }
+
+ void Listener::_logListen( int port , bool ssl ) {
+ log() << _name << ( _name.size() ? " " : "" ) << "waiting for connections on port " << port << ( ssl ? " ssl" : "" ) << endl;
+ }
+
+
+ void Listener::accepted(Socket socket) {
+ accepted( new MessagingPort(socket) );
+ }
+
+ void Listener::accepted(MessagingPort *mp) {
+ assert(!"You must overwrite one of the accepted methods");
+ }
+
+ // ----- ListeningSockets -------
+
+ ListeningSockets* ListeningSockets::_instance = new ListeningSockets();
+
+ ListeningSockets* ListeningSockets::get() {
+ return _instance;
+ }
+
+ // ------ connection ticket and control ------
+
+ const int DEFAULT_MAX_CONN = 20000;
+ const int MAX_MAX_CONN = 20000;
+
+ int getMaxConnections() {
+#ifdef _WIN32
+ return DEFAULT_MAX_CONN;
+#else
+ struct rlimit limit;
+ assert( getrlimit(RLIMIT_NOFILE,&limit) == 0 );
+
+ int max = (int)(limit.rlim_cur * .8);
+
+ log(1) << "fd limit"
+ << " hard:" << limit.rlim_max
+ << " soft:" << limit.rlim_cur
+ << " max conn: " << max
+ << endl;
+
+ if ( max > MAX_MAX_CONN )
+ max = MAX_MAX_CONN;
+
+ return max;
+#endif
+ }
+
+ void checkTicketNumbers() {
+ int want = getMaxConnections();
+ int current = connTicketHolder.outof();
+ if ( current != DEFAULT_MAX_CONN ) {
+ if ( current < want ) {
+ // they want fewer than they can handle
+ // which is fine
+ log(1) << " only allowing " << current << " connections" << endl;
+ return;
+ }
+ if ( current > want ) {
+ log() << " --maxConns too high, can only handle " << want << endl;
+ }
+ }
+ connTicketHolder.resize( want );
+ }
+
+ TicketHolder connTicketHolder(DEFAULT_MAX_CONN);
+
+}
diff --git a/util/net/listen.h b/util/net/listen.h
new file mode 100644
index 0000000..415db1e
--- /dev/null
+++ b/util/net/listen.h
@@ -0,0 +1,190 @@
+// listen.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "sock.h"
+
+namespace mongo {
+
+ class MessagingPort;
+
+ class Listener : boost::noncopyable {
+ public:
+
+ Listener(const string& name, const string &ip, int port, bool logConnect=true );
+
+ virtual ~Listener();
+
+#ifdef MONGO_SSL
+ /**
+ * make this an ssl socket
+ * ownership of SSLManager remains with the caller
+ */
+ void secure( SSLManager* manager );
+
+ void addSecurePort( SSLManager* manager , int additionalPort );
+#endif
+
+ void initAndListen(); // never returns unless error (start a thread)
+
+ /* spawn a thread, etc., then return */
+ virtual void accepted(Socket socket);
+ virtual void accepted(MessagingPort *mp);
+
+ const int _port;
+
+ /**
+ * @return a rough estimate of elapsed time since the server started
+ */
+ long long getMyElapsedTimeMillis() const { return _elapsedTime; }
+
+ void setAsTimeTracker() {
+ _timeTracker = this;
+ }
+
+ static const Listener* getTimeTracker() {
+ return _timeTracker;
+ }
+
+ static long long getElapsedTimeMillis() {
+ if ( _timeTracker )
+ return _timeTracker->getMyElapsedTimeMillis();
+
+ // should this assert or throw? seems like callers may not expect to get zero back, certainly not forever.
+ return 0;
+ }
+
+ private:
+ string _name;
+ string _ip;
+ bool _logConnect;
+ long long _elapsedTime;
+
+#ifdef MONGO_SSL
+ SSLManager* _ssl;
+ int _sslPort;
+#endif
+
+ /**
+ * @return true iff everything went ok
+ */
+ bool _setupSockets( const vector<SockAddr>& mine , vector<int>& socks );
+
+ void _logListen( int port , bool ssl );
+
+ static const Listener* _timeTracker;
+
+ virtual bool useUnixSockets() const { return false; }
+ };
+
+ /**
+ * keep track of elapsed time
+ * after a set amount of time, tells you to do something
+ * only in this file because depends on Listener
+ */
+ class ElapsedTracker {
+ public:
+ ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks )
+ : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) {
+ _last = Listener::getElapsedTimeMillis();
+ }
+
+ /**
+ * call this for every iteration
+ * returns true if one of the triggers has gone off
+ */
+ bool ping() {
+ if ( ( ++_pings % _h ) == 0 ) {
+ _last = Listener::getElapsedTimeMillis();
+ return true;
+ }
+
+ long long now = Listener::getElapsedTimeMillis();
+ if ( now - _last > _ms ) {
+ _last = now;
+ return true;
+ }
+
+ return false;
+ }
+
+ private:
+ int _h;
+ int _ms;
+
+ unsigned long long _pings;
+
+ long long _last;
+
+ };
+
+ class ListeningSockets {
+ public:
+ ListeningSockets()
+ : _mutex("ListeningSockets")
+ , _sockets( new set<int>() )
+ , _socketPaths( new set<string>() )
+ { }
+ void add( int sock ) {
+ scoped_lock lk( _mutex );
+ _sockets->insert( sock );
+ }
+ void addPath( string path ) {
+ scoped_lock lk( _mutex );
+ _socketPaths->insert( path );
+ }
+ void remove( int sock ) {
+ scoped_lock lk( _mutex );
+ _sockets->erase( sock );
+ }
+ void closeAll() {
+ set<int>* sockets;
+ set<string>* paths;
+
+ {
+ scoped_lock lk( _mutex );
+ sockets = _sockets;
+ _sockets = new set<int>();
+ paths = _socketPaths;
+ _socketPaths = new set<string>();
+ }
+
+ for ( set<int>::iterator i=sockets->begin(); i!=sockets->end(); i++ ) {
+ int sock = *i;
+ log() << "closing listening socket: " << sock << endl;
+ closesocket( sock );
+ }
+
+ for ( set<string>::iterator i=paths->begin(); i!=paths->end(); i++ ) {
+ string path = *i;
+ log() << "removing socket file: " << path << endl;
+ ::remove( path.c_str() );
+ }
+ }
+ static ListeningSockets* get();
+ private:
+ mongo::mutex _mutex;
+ set<int>* _sockets;
+ set<string>* _socketPaths; // for unix domain sockets
+ static ListeningSockets* _instance;
+ };
+
+
+ extern TicketHolder connTicketHolder;
+
+}
diff --git a/util/net/message.cpp b/util/net/message.cpp
new file mode 100644
index 0000000..a84e5c4
--- /dev/null
+++ b/util/net/message.cpp
@@ -0,0 +1,64 @@
+// message.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+
+#include <fcntl.h>
+#include <errno.h>
+#include <time.h>
+
+#include "message.h"
+#include "message_port.h"
+#include "listen.h"
+
+#include "../goodies.h"
+#include "../../client/dbclient.h"
+
+namespace mongo {
+
+ void Message::send( MessagingPort &p, const char *context ) {
+ if ( empty() ) {
+ return;
+ }
+ if ( _buf != 0 ) {
+ p.send( (char*)_buf, _buf->len, context );
+ }
+ else {
+ p.send( _data, context );
+ }
+ }
+
+ MSGID NextMsgId;
+
+ /*struct MsgStart {
+ MsgStart() {
+ NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
+ assert(MsgDataHeaderSize == 16);
+ }
+ } msgstart;*/
+
+ MSGID nextMessageId() {
+ MSGID msgid = NextMsgId++;
+ return msgid;
+ }
+
+ bool doesOpGetAResponse( int op ) {
+ return op == dbQuery || op == dbGetMore;
+ }
+
+
+} // namespace mongo
diff --git a/util/message.h b/util/net/message.h
index f114445..16da5d6 100644
--- a/util/message.h
+++ b/util/net/message.h
@@ -1,4 +1,4 @@
-// Message.h
+// message.h
/* Copyright 2009 10gen Inc.
*
@@ -17,154 +17,17 @@
#pragma once
-#include "../util/sock.h"
-#include "../bson/util/atomic_int.h"
+#include "sock.h"
+#include "../../bson/util/atomic_int.h"
#include "hostandport.h"
namespace mongo {
- extern bool noUnixSocket;
-
class Message;
class MessagingPort;
class PiggyBackData;
- typedef AtomicUInt MSGID;
-
- class Listener : boost::noncopyable {
- public:
- Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0) { }
- virtual ~Listener() {
- if ( _timeTracker == this )
- _timeTracker = 0;
- }
- void initAndListen(); // never returns unless error (start a thread)
-
- /* spawn a thread, etc., then return */
- virtual void accepted(int sock, const SockAddr& from);
- virtual void accepted(MessagingPort *mp) {
- assert(!"You must overwrite one of the accepted methods");
- }
-
- const int _port;
-
- /**
- * @return a rough estimate of elepased time since the server started
- */
- long long getMyElapsedTimeMillis() const { return _elapsedTime; }
-
- void setAsTimeTracker() {
- _timeTracker = this;
- }
-
- static const Listener* getTimeTracker() {
- return _timeTracker;
- }
-
- static long long getElapsedTimeMillis() {
- if ( _timeTracker )
- return _timeTracker->getMyElapsedTimeMillis();
-
- // should this assert or throw? seems like callers may not expect to get zero back, certainly not forever.
- return 0;
- }
-
- private:
- string _ip;
- bool _logConnect;
- long long _elapsedTime;
-
- static const Listener* _timeTracker;
- };
-
- class AbstractMessagingPort : boost::noncopyable {
- public:
- virtual ~AbstractMessagingPort() { }
- virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
- virtual void reply(Message& received, Message& response) = 0;
-
- virtual HostAndPort remote() const = 0;
- virtual unsigned remotePort() const = 0;
-
- private:
- int _clientId;
- };
-
- class MessagingPort : public AbstractMessagingPort {
- public:
- MessagingPort(int sock, const SockAddr& farEnd);
-
- // in some cases the timeout will actually be 2x this value - eg we do a partial send,
- // then the timeout fires, then we try to send again, then the timeout fires again with
- // no data sent, then we detect that the other side is down
- MessagingPort(double so_timeout = 0, int logLevel = 0 );
-
- virtual ~MessagingPort();
-
- void shutdown();
-
- bool connect(SockAddr& farEnd);
-
- /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
- also, the Message data will go out of scope on the subsequent recv call.
- */
- bool recv(Message& m);
- void reply(Message& received, Message& response, MSGID responseTo);
- void reply(Message& received, Message& response);
- bool call(Message& toSend, Message& response);
-
- void say(Message& toSend, int responseTo = -1);
-
- /**
- * this is used for doing 'async' queries
- * instead of doing call( to , from )
- * you would do
- * say( to )
- * recv( from )
- * Note: if you fail to call recv and someone else uses this port,
- * horrible things will happend
- */
- bool recv( const Message& sent , Message& response );
-
- void piggyBack( Message& toSend , int responseTo = -1 );
-
- virtual unsigned remotePort() const;
- virtual HostAndPort remote() const;
-
- // send len or throw SocketException
- void send( const char * data , int len, const char *context );
- void send( const vector< pair< char *, int > > &data, const char *context );
-
- // recv len or throw SocketException
- void recv( char * data , int len );
-
- int unsafe_recv( char *buf, int max );
- void clearCounters() { _bytesIn = 0; _bytesOut = 0; }
- long long getBytesIn() const { return _bytesIn; }
- long long getBytesOut() const { return _bytesOut; }
- private:
- int sock;
- PiggyBackData * piggyBackData;
-
- long long _bytesIn;
- long long _bytesOut;
-
- // this is the parsed version of farEnd
- // mutable because its initialized only on call to remote()
- mutable HostAndPort _farEndParsed;
-
- public:
- SockAddr farEnd;
- double _timeout;
- int _logLevel; // passed to log() when logging errors
-
- static void closeAllSockets(unsigned tagMask = 0xffffffff);
-
- /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */
- unsigned tag;
-
- friend class PiggyBackData;
- };
+ typedef AtomicUInt MSGID;
enum Operations {
opReply = 1, /* reply. responseTo is set. */
@@ -425,17 +288,9 @@ namespace mongo {
return _freeIt;
}
- void send( MessagingPort &p, const char *context ) {
- if ( empty() ) {
- return;
- }
- if ( _buf != 0 ) {
- p.send( (char*)_buf, _buf->len, context );
- }
- else {
- p.send( _data, context );
- }
- }
+ void send( MessagingPort &p, const char *context );
+
+ string toString() const;
private:
void _setData( MsgData *d, bool freeIt ) {
@@ -450,59 +305,8 @@ namespace mongo {
bool _freeIt;
};
- class SocketException : public DBException {
- public:
- const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type;
-
- SocketException( Type t , string server="" , int code = 9001 , string extra="" ) : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ }
- virtual ~SocketException() throw() {}
-
- bool shouldPrint() const { return _type != CLOSED; }
- virtual string toString() const;
-
- private:
- string _server;
- string _extra;
- };
MSGID nextMessageId();
- extern TicketHolder connTicketHolder;
-
- class ElapsedTracker {
- public:
- ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks )
- : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0) {
- _last = Listener::getElapsedTimeMillis();
- }
-
- /**
- * call this for every iteration
- * returns true if one of the triggers has gone off
- */
- bool ping() {
- if ( ( ++_pings % _h ) == 0 ) {
- _last = Listener::getElapsedTimeMillis();
- return true;
- }
-
- long long now = Listener::getElapsedTimeMillis();
- if ( now - _last > _ms ) {
- _last = now;
- return true;
- }
-
- return false;
- }
-
- private:
- int _h;
- int _ms;
-
- unsigned long long _pings;
-
- long long _last;
-
- };
} // namespace mongo
diff --git a/util/net/message_port.cpp b/util/net/message_port.cpp
new file mode 100644
index 0000000..9abfaf7
--- /dev/null
+++ b/util/net/message_port.cpp
@@ -0,0 +1,298 @@
+// message_port.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+
+#include <fcntl.h>
+#include <errno.h>
+#include <time.h>
+
+#include "message.h"
+#include "message_port.h"
+#include "listen.h"
+
+#include "../goodies.h"
+#include "../background.h"
+#include "../time_support.h"
+#include "../../db/cmdline.h"
+#include "../../client/dbclient.h"
+
+
+#ifndef _WIN32
+# ifndef __sunos__
+# include <ifaddrs.h>
+# endif
+# include <sys/resource.h>
+# include <sys/stat.h>
+#else
+
+// errno doesn't work for winsock.
+#undef errno
+#define errno WSAGetLastError()
+
+#endif
+
+namespace mongo {
+
+
+// if you want trace output:
+#define mmm(x)
+
+ /* messagingport -------------------------------------------------------------- */
+
+ class PiggyBackData {
+ public:
+ PiggyBackData( MessagingPort * port ) {
+ _port = port;
+ _buf = new char[1300];
+ _cur = _buf;
+ }
+
+ ~PiggyBackData() {
+ DESTRUCTOR_GUARD (
+ flush();
+ delete[]( _cur );
+ );
+ }
+
+ void append( Message& m ) {
+ assert( m.header()->len <= 1300 );
+
+ if ( len() + m.header()->len > 1300 )
+ flush();
+
+ memcpy( _cur , m.singleData() , m.header()->len );
+ _cur += m.header()->len;
+ }
+
+ void flush() {
+ if ( _buf == _cur )
+ return;
+
+ _port->send( _buf , len(), "flush" );
+ _cur = _buf;
+ }
+
+ int len() const { return _cur - _buf; }
+
+ private:
+ MessagingPort* _port;
+ char * _buf;
+ char * _cur;
+ };
+
+ class Ports {
+ set<MessagingPort*> ports;
+ mongo::mutex m;
+ public:
+ Ports() : ports(), m("Ports") {}
+ void closeAll(unsigned skip_mask) {
+ scoped_lock bl(m);
+ for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) {
+ if( (*i)->tag & skip_mask )
+ continue;
+ (*i)->shutdown();
+ }
+ }
+ void insert(MessagingPort* p) {
+ scoped_lock bl(m);
+ ports.insert(p);
+ }
+ void erase(MessagingPort* p) {
+ scoped_lock bl(m);
+ ports.erase(p);
+ }
+ };
+
+ // we "new" this so it is still be around when other automatic global vars
+ // are being destructed during termination.
+ Ports& ports = *(new Ports());
+
+ void MessagingPort::closeAllSockets(unsigned mask) {
+ ports.closeAll(mask);
+ }
+
+ MessagingPort::MessagingPort(int fd, const SockAddr& remote)
+ : Socket( fd , remote ) , piggyBackData(0) {
+ ports.insert(this);
+ }
+
+ MessagingPort::MessagingPort( double timeout, int ll )
+ : Socket( timeout, ll ) {
+ ports.insert(this);
+ piggyBackData = 0;
+ }
+
+ MessagingPort::MessagingPort( Socket& sock )
+ : Socket( sock ) , piggyBackData( 0 ) {
+ }
+
+ void MessagingPort::shutdown() {
+ close();
+ }
+
+ MessagingPort::~MessagingPort() {
+ if ( piggyBackData )
+ delete( piggyBackData );
+ shutdown();
+ ports.erase(this);
+ }
+
+ bool MessagingPort::recv(Message& m) {
+ try {
+again:
+ mmm( log() << "* recv() sock:" << this->sock << endl; )
+ int len = -1;
+
+ char *lenbuf = (char *) &len;
+ int lft = 4;
+ Socket::recv( lenbuf, lft );
+
+ if ( len < 16 || len > 48000000 ) { // messages must be large enough for headers
+ if ( len == -1 ) {
+ // Endian check from the client, after connecting, to see what mode server is running in.
+ unsigned foo = 0x10203040;
+ send( (char *) &foo, 4, "endian" );
+ goto again;
+ }
+
+ if ( len == 542393671 ) {
+ // an http GET
+ log(_logLevel) << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl;
+ string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n";
+ stringstream ss;
+ ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
+ string s = ss.str();
+ send( s.c_str(), s.size(), "http" );
+ return false;
+ }
+ log(0) << "recv(): message len " << len << " is too large" << len << endl;
+ return false;
+ }
+
+ int z = (len+1023)&0xfffffc00;
+ assert(z>=len);
+ MsgData *md = (MsgData *) malloc(z);
+ assert(md);
+ md->len = len;
+
+ char *p = (char *) &md->id;
+ int left = len -4;
+
+ try {
+ Socket::recv( p, left );
+ }
+ catch (...) {
+ free(md);
+ throw;
+ }
+
+ m.setData(md, true);
+ return true;
+
+ }
+ catch ( const SocketException & e ) {
+ log(_logLevel + (e.shouldPrint() ? 0 : 1) ) << "SocketException: remote: " << remote() << " error: " << e << endl;
+ m.reset();
+ return false;
+ }
+ }
+
+ void MessagingPort::reply(Message& received, Message& response) {
+ say(/*received.from, */response, received.header()->id);
+ }
+
+ void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) {
+ say(/*received.from, */response, responseTo);
+ }
+
+ bool MessagingPort::call(Message& toSend, Message& response) {
+ mmm( log() << "*call()" << endl; )
+ say(toSend);
+ return recv( toSend , response );
+ }
+
+ bool MessagingPort::recv( const Message& toSend , Message& response ) {
+ while ( 1 ) {
+ bool ok = recv(response);
+ if ( !ok )
+ return false;
+ //log() << "got response: " << response.data->responseTo << endl;
+ if ( response.header()->responseTo == toSend.header()->id )
+ break;
+ error() << "MessagingPort::call() wrong id got:" << hex << (unsigned)response.header()->responseTo << " expect:" << (unsigned)toSend.header()->id << '\n'
+ << dec
+ << " toSend op: " << (unsigned)toSend.operation() << '\n'
+ << " response msgid:" << (unsigned)response.header()->id << '\n'
+ << " response len: " << (unsigned)response.header()->len << '\n'
+ << " response op: " << response.operation() << '\n'
+ << " remote: " << remoteString() << endl;
+ assert(false);
+ response.reset();
+ }
+ mmm( log() << "*call() end" << endl; )
+ return true;
+ }
+
+ void MessagingPort::say(Message& toSend, int responseTo) {
+ assert( !toSend.empty() );
+ mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
+ toSend.header()->id = nextMessageId();
+ toSend.header()->responseTo = responseTo;
+
+ if ( piggyBackData && piggyBackData->len() ) {
+ mmm( log() << "* have piggy back" << endl; )
+ if ( ( piggyBackData->len() + toSend.header()->len ) > 1300 ) {
+ // won't fit in a packet - so just send it off
+ piggyBackData->flush();
+ }
+ else {
+ piggyBackData->append( toSend );
+ piggyBackData->flush();
+ return;
+ }
+ }
+
+ toSend.send( *this, "say" );
+ }
+
+ void MessagingPort::piggyBack( Message& toSend , int responseTo ) {
+
+ if ( toSend.header()->len > 1300 ) {
+ // not worth saving because its almost an entire packet
+ say( toSend );
+ return;
+ }
+
+ // we're going to be storing this, so need to set it up
+ toSend.header()->id = nextMessageId();
+ toSend.header()->responseTo = responseTo;
+
+ if ( ! piggyBackData )
+ piggyBackData = new PiggyBackData( this );
+
+ piggyBackData->append( toSend );
+ }
+
+ HostAndPort MessagingPort::remote() const {
+ if ( ! _remoteParsed.hasPort() )
+ _remoteParsed = HostAndPort( remoteAddr() );
+ return _remoteParsed;
+ }
+
+
+} // namespace mongo
diff --git a/util/net/message_port.h b/util/net/message_port.h
new file mode 100644
index 0000000..22ecafe
--- /dev/null
+++ b/util/net/message_port.h
@@ -0,0 +1,107 @@
+// message_port.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "sock.h"
+#include "message.h"
+
+namespace mongo {
+
+ class MessagingPort;
+ class PiggyBackData;
+
+ typedef AtomicUInt MSGID;
+
+ class AbstractMessagingPort : boost::noncopyable {
+ public:
+ AbstractMessagingPort() : tag(0) {}
+ virtual ~AbstractMessagingPort() { }
+ virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
+ virtual void reply(Message& received, Message& response) = 0;
+
+ virtual HostAndPort remote() const = 0;
+ virtual unsigned remotePort() const = 0;
+
+ private:
+
+ public:
+ // TODO make this private with some helpers
+
+ /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */
+ unsigned tag;
+
+ };
+
+ class MessagingPort : public AbstractMessagingPort , public Socket {
+ public:
+ MessagingPort(int fd, const SockAddr& remote);
+
+ // in some cases the timeout will actually be 2x this value - eg we do a partial send,
+ // then the timeout fires, then we try to send again, then the timeout fires again with
+ // no data sent, then we detect that the other side is down
+ MessagingPort(double so_timeout = 0, int logLevel = 0 );
+
+ MessagingPort(Socket& socket);
+
+ virtual ~MessagingPort();
+
+ void shutdown();
+
+ /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
+ also, the Message data will go out of scope on the subsequent recv call.
+ */
+ bool recv(Message& m);
+ void reply(Message& received, Message& response, MSGID responseTo);
+ void reply(Message& received, Message& response);
+ bool call(Message& toSend, Message& response);
+
+ void say(Message& toSend, int responseTo = -1);
+
+ /**
+ * this is used for doing 'async' queries
+ * instead of doing call( to , from )
+ * you would do
+ * say( to )
+ * recv( from )
+ * Note: if you fail to call recv and someone else uses this port,
+ * horrible things will happend
+ */
+ bool recv( const Message& sent , Message& response );
+
+ void piggyBack( Message& toSend , int responseTo = -1 );
+
+ unsigned remotePort() const { return Socket::remotePort(); }
+ virtual HostAndPort remote() const;
+
+
+ private:
+
+ PiggyBackData * piggyBackData;
+
+ // this is the parsed version of remote
+ // mutable because its initialized only on call to remote()
+ mutable HostAndPort _remoteParsed;
+
+ public:
+ static void closeAllSockets(unsigned tagMask = 0xffffffff);
+
+ friend class PiggyBackData;
+ };
+
+
+} // namespace mongo
diff --git a/util/message_server.h b/util/net/message_server.h
index defae0b..ae77b97 100644
--- a/util/message_server.h
+++ b/util/net/message_server.h
@@ -22,16 +22,28 @@
#pragma once
-#include "../pch.h"
+#include "../../pch.h"
namespace mongo {
class MessageHandler {
public:
virtual ~MessageHandler() {}
-
+
+ /**
+ * called once when a socket is connected
+ */
virtual void connected( AbstractMessagingPort* p ) = 0;
+
+ /**
+ * called every time a message comes in
+ * handler is responsible for responding to client
+ */
virtual void process( Message& m , AbstractMessagingPort* p , LastError * err ) = 0;
+
+ /**
+ * called once when a socket is disconnected
+ */
virtual void disconnected( AbstractMessagingPort* p ) = 0;
};
diff --git a/util/message_server_asio.cpp b/util/net/message_server_asio.cpp
index 0c6a7d9..0c6a7d9 100644
--- a/util/message_server_asio.cpp
+++ b/util/net/message_server_asio.cpp
diff --git a/util/message_server_port.cpp b/util/net/message_server_port.cpp
index 409b0c7..ca0b13d 100644
--- a/util/message_server_port.cpp
+++ b/util/net/message_server_port.cpp
@@ -20,13 +20,15 @@
#ifndef USE_ASIO
#include "message.h"
+#include "message_port.h"
#include "message_server.h"
+#include "listen.h"
-#include "../db/cmdline.h"
-#include "../db/lasterror.h"
-#include "../db/stats/counters.h"
+#include "../../db/cmdline.h"
+#include "../../db/lasterror.h"
+#include "../../db/stats/counters.h"
-#ifdef __linux__
+#ifdef __linux__ // TODO: consider making this ifndef _WIN32
# include <sys/resource.h>
#endif
@@ -38,13 +40,15 @@ namespace mongo {
void threadRun( MessagingPort * inPort) {
TicketHolderReleaser connTicketReleaser( &connTicketHolder );
-
- assert( inPort );
setThreadName( "conn" );
-
+
+ assert( inPort );
+ inPort->setLogLevel(1);
scoped_ptr<MessagingPort> p( inPort );
+ p->postFork();
+
string otherSide;
Message m;
@@ -52,11 +56,11 @@ namespace mongo {
LastError * le = new LastError();
lastError.reset( le ); // lastError now has ownership
- otherSide = p->farEnd.toString();
+ otherSide = p->remoteString();
handler->connected( p.get() );
- while ( 1 ) {
+ while ( ! inShutdown() ) {
m.reset();
p->clearCounters();
@@ -71,14 +75,25 @@ namespace mongo {
networkCounter.hit( p->getBytesIn() , p->getBytesOut() );
}
}
- catch ( const SocketException& ) {
- log() << "unclean socket shutdown from: " << otherSide << endl;
+ catch ( AssertionException& e ) {
+ log() << "AssertionException handling request, closing client connection: " << e << endl;
+ p->shutdown();
+ }
+ catch ( SocketException& e ) {
+ log() << "SocketException handling request, closing client connection: " << e << endl;
+ p->shutdown();
}
- catch ( const std::exception& e ) {
- problem() << "uncaught exception (" << e.what() << ")(" << demangleName( typeid(e) ) <<") in PortMessageServer::threadRun, closing connection" << endl;
+ catch ( const ClockSkewException & ) {
+ log() << "ClockSkewException - shutting down" << endl;
+ exitCleanly( EXIT_CLOCK_SKEW );
+ }
+ catch ( std::exception &e ) {
+ error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl;
+ dbexit( EXIT_UNCAUGHT );
}
catch ( ... ) {
- problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl;
+ error() << "Uncaught exception, terminating" << endl;
+ dbexit( EXIT_UNCAUGHT );
}
handler->disconnected( p.get() );
@@ -89,7 +104,7 @@ namespace mongo {
class PortMessageServer : public MessageServer , public Listener {
public:
PortMessageServer( const MessageServer::Options& opts, MessageHandler * handler ) :
- Listener( opts.ipList, opts.port ) {
+ Listener( "" , opts.ipList, opts.port ) {
uassert( 10275 , "multiple PortMessageServer not supported" , ! pms::handler );
pms::handler = handler;
@@ -116,19 +131,19 @@ namespace mongo {
pthread_attr_init(&attrs);
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
- static const size_t STACK_SIZE = 4*1024*1024;
+ static const size_t STACK_SIZE = 1024*1024; // if we change this we need to update the warning
struct rlimit limits;
- assert(getrlimit(RLIMIT_STACK, &limits) == 0);
+ verify(15887, getrlimit(RLIMIT_STACK, &limits) == 0);
if (limits.rlim_cur > STACK_SIZE) {
pthread_attr_setstacksize(&attrs, (DEBUG_BUILD
? (STACK_SIZE / 2)
: STACK_SIZE));
- }
- else if (limits.rlim_cur < 1024*1024) {
- warning() << "Stack size set to " << (limits.rlim_cur/1024) << "KB. We suggest at least 1MB" << endl;
+ } else if (limits.rlim_cur < 1024*1024) {
+ warning() << "Stack size set to " << (limits.rlim_cur/1024) << "KB. We suggest 1MB" << endl;
}
+
pthread_t thread;
int failed = pthread_create(&thread, &attrs, (void*(*)(void*)) &pms::threadRun, p);
@@ -149,6 +164,16 @@ namespace mongo {
sleepmillis(2);
}
+ catch ( ... ) {
+ connTicketHolder.release();
+ log() << "unknown error accepting new socket" << endl;
+
+ p->shutdown();
+ delete p;
+
+ sleepmillis(2);
+ }
+
}
virtual void setAsTimeTracker() {
@@ -159,6 +184,7 @@ namespace mongo {
initAndListen();
}
+ virtual bool useUnixSockets() const { return true; }
};
diff --git a/util/miniwebserver.cpp b/util/net/miniwebserver.cpp
index e700112..0793100 100644
--- a/util/miniwebserver.cpp
+++ b/util/net/miniwebserver.cpp
@@ -17,14 +17,14 @@
#include "pch.h"
#include "miniwebserver.h"
-#include "hex.h"
+#include "../hex.h"
-#include <pcrecpp.h>
+#include "pcrecpp.h"
namespace mongo {
- MiniWebServer::MiniWebServer(const string &ip, int port)
- : Listener(ip, port, false)
+ MiniWebServer::MiniWebServer(const string& name, const string &ip, int port)
+ : Listener(name, ip, port, false)
{}
string MiniWebServer::parseURL( const char * buf ) {
@@ -108,17 +108,18 @@ namespace mongo {
return false;
}
- void MiniWebServer::accepted(int s, const SockAddr &from) {
- setSockTimeouts(s, 8);
+ void MiniWebServer::accepted(Socket sock) {
+ sock.postFork();
+ sock.setTimeout(8);
char buf[4096];
int len = 0;
while ( 1 ) {
int left = sizeof(buf) - 1 - len;
if( left == 0 )
break;
- int x = ::recv(s, buf + len, left, 0);
+ int x = sock.unsafe_recv( buf + len , left );
if ( x <= 0 ) {
- closesocket(s);
+ sock.close();
return;
}
len += x;
@@ -134,7 +135,7 @@ namespace mongo {
vector<string> headers;
try {
- doRequest(buf, parseURL( buf ), responseMsg, responseCode, headers, from);
+ doRequest(buf, parseURL( buf ), responseMsg, responseCode, headers, sock.remoteAddr() );
}
catch ( std::exception& e ) {
responseCode = 500;
@@ -165,8 +166,8 @@ namespace mongo {
ss << responseMsg;
string response = ss.str();
- ::send(s, response.c_str(), response.size(), 0);
- closesocket(s);
+ sock.send( response.c_str(), response.size() , "http response" );
+ sock.close();
}
string MiniWebServer::getHeader( const char * req , string wanted ) {
diff --git a/util/miniwebserver.h b/util/net/miniwebserver.h
index b385afc..1fb6b3f 100644
--- a/util/miniwebserver.h
+++ b/util/net/miniwebserver.h
@@ -17,15 +17,17 @@
#pragma once
-#include "../pch.h"
+#include "../../pch.h"
#include "message.h"
-#include "../db/jsobj.h"
+#include "message_port.h"
+#include "listen.h"
+#include "../../db/jsobj.h"
namespace mongo {
class MiniWebServer : public Listener {
public:
- MiniWebServer(const string &ip, int _port);
+ MiniWebServer(const string& name, const string &ip, int _port);
virtual ~MiniWebServer() {}
virtual void doRequest(
@@ -51,7 +53,7 @@ namespace mongo {
static string urlDecode(string s) {return urlDecode(s.c_str());}
private:
- void accepted(int s, const SockAddr &from);
+ void accepted(Socket socket);
static bool fullReceive( const char *buf );
};
diff --git a/util/net/sock.cpp b/util/net/sock.cpp
new file mode 100644
index 0000000..69c42f2
--- /dev/null
+++ b/util/net/sock.cpp
@@ -0,0 +1,713 @@
+// @file sock.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "sock.h"
+#include "../background.h"
+
+#if !defined(_WIN32)
+# include <sys/socket.h>
+# include <sys/types.h>
+# include <sys/socket.h>
+# include <sys/un.h>
+# include <netinet/in.h>
+# include <netinet/tcp.h>
+# include <arpa/inet.h>
+# include <errno.h>
+# include <netdb.h>
+# if defined(__openbsd__)
+# include <sys/uio.h>
+# endif
+#endif
+
+#ifdef MONGO_SSL
+#include <openssl/err.h>
+#include <openssl/ssl.h>
+#endif
+
+
+namespace mongo {
+
+ static bool ipv6 = false;
+ void enableIPv6(bool state) { ipv6 = state; }
+ bool IPv6Enabled() { return ipv6; }
+
+ void setSockTimeouts(int sock, double secs) {
+ struct timeval tv;
+ tv.tv_sec = (int)secs;
+ tv.tv_usec = (int)((long long)(secs*1000*1000) % (1000*1000));
+ bool report = logLevel > 3; // solaris doesn't provide these
+ DEV report = true;
+ bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv) ) == 0;
+ if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl;
+ ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof(tv) ) == 0;
+ DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl;
+ }
+
+#if defined(_WIN32)
+ void disableNagle(int sock) {
+ int x = 1;
+ if ( setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &x, sizeof(x)) )
+ error() << "disableNagle failed" << endl;
+ if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) )
+ error() << "SO_KEEPALIVE failed" << endl;
+ }
+#else
+
+ void disableNagle(int sock) {
+ int x = 1;
+
+#ifdef SOL_TCP
+ int level = SOL_TCP;
+#else
+ int level = SOL_SOCKET;
+#endif
+
+ if ( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) )
+ error() << "disableNagle failed: " << errnoWithDescription() << endl;
+
+#ifdef SO_KEEPALIVE
+ if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) )
+ error() << "SO_KEEPALIVE failed: " << errnoWithDescription() << endl;
+
+# ifdef __linux__
+ socklen_t len = sizeof(x);
+ if ( getsockopt(sock, level, TCP_KEEPIDLE, (char *) &x, &len) )
+ error() << "can't get TCP_KEEPIDLE: " << errnoWithDescription() << endl;
+
+ if (x > 300) {
+ x = 300;
+ if ( setsockopt(sock, level, TCP_KEEPIDLE, (char *) &x, sizeof(x)) ) {
+ error() << "can't set TCP_KEEPIDLE: " << errnoWithDescription() << endl;
+ }
+ }
+
+ len = sizeof(x); // just in case it changed
+ if ( getsockopt(sock, level, TCP_KEEPINTVL, (char *) &x, &len) )
+ error() << "can't get TCP_KEEPINTVL: " << errnoWithDescription() << endl;
+
+ if (x > 300) {
+ x = 300;
+ if ( setsockopt(sock, level, TCP_KEEPINTVL, (char *) &x, sizeof(x)) ) {
+ error() << "can't set TCP_KEEPINTVL: " << errnoWithDescription() << endl;
+ }
+ }
+# endif
+#endif
+
+ }
+
+#endif
+
+ string getAddrInfoStrError(int code) {
+#if !defined(_WIN32)
+ return gai_strerror(code);
+#else
+ /* gai_strerrorA is not threadsafe on windows. don't use it. */
+ return errnoWithDescription(code);
+#endif
+ }
+
+
+ // --- SockAddr
+
+ SockAddr::SockAddr(int sourcePort) {
+ memset(as<sockaddr_in>().sin_zero, 0, sizeof(as<sockaddr_in>().sin_zero));
+ as<sockaddr_in>().sin_family = AF_INET;
+ as<sockaddr_in>().sin_port = htons(sourcePort);
+ as<sockaddr_in>().sin_addr.s_addr = htonl(INADDR_ANY);
+ addressSize = sizeof(sockaddr_in);
+ }
+
+ SockAddr::SockAddr(const char * iporhost , int port) {
+ if (!strcmp(iporhost, "localhost"))
+ iporhost = "127.0.0.1";
+
+ if (strchr(iporhost, '/')) {
+#ifdef _WIN32
+ uassert(13080, "no unix socket support on windows", false);
+#endif
+ uassert(13079, "path to unix socket too long", strlen(iporhost) < sizeof(as<sockaddr_un>().sun_path));
+ as<sockaddr_un>().sun_family = AF_UNIX;
+ strcpy(as<sockaddr_un>().sun_path, iporhost);
+ addressSize = sizeof(sockaddr_un);
+ }
+ else {
+ addrinfo* addrs = NULL;
+ addrinfo hints;
+ memset(&hints, 0, sizeof(addrinfo));
+ hints.ai_socktype = SOCK_STREAM;
+ //hints.ai_flags = AI_ADDRCONFIG; // This is often recommended but don't do it. SERVER-1579
+ hints.ai_flags |= AI_NUMERICHOST; // first pass tries w/o DNS lookup
+ hints.ai_family = (IPv6Enabled() ? AF_UNSPEC : AF_INET);
+
+ StringBuilder ss;
+ ss << port;
+ int ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs);
+
+ // old C compilers on IPv6-capable hosts return EAI_NODATA error
+#ifdef EAI_NODATA
+ int nodata = (ret == EAI_NODATA);
+#else
+ int nodata = false;
+#endif
+ if (ret == EAI_NONAME || nodata) {
+ // iporhost isn't an IP address, allow DNS lookup
+ hints.ai_flags &= ~AI_NUMERICHOST;
+ ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs);
+ }
+
+ if (ret) {
+ // don't log if this as it is a CRT construction and log() may not work yet.
+ if( strcmp("0.0.0.0", iporhost) ) {
+ log() << "getaddrinfo(\"" << iporhost << "\") failed: " << gai_strerror(ret) << endl;
+ }
+ *this = SockAddr(port);
+ }
+ else {
+ //TODO: handle other addresses in linked list;
+ assert(addrs->ai_addrlen <= sizeof(sa));
+ memcpy(&sa, addrs->ai_addr, addrs->ai_addrlen);
+ addressSize = addrs->ai_addrlen;
+ freeaddrinfo(addrs);
+ }
+ }
+ }
+
+ bool SockAddr::isLocalHost() const {
+ switch (getType()) {
+ case AF_INET: return getAddr() == "127.0.0.1";
+ case AF_INET6: return getAddr() == "::1";
+ case AF_UNIX: return true;
+ default: return false;
+ }
+ assert(false);
+ return false;
+ }
+
+ string SockAddr::toString(bool includePort) const {
+ string out = getAddr();
+ if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC)
+ out += mongoutils::str::stream() << ':' << getPort();
+ return out;
+ }
+
+ sa_family_t SockAddr::getType() const {
+ return sa.ss_family;
+ }
+
+ unsigned SockAddr::getPort() const {
+ switch (getType()) {
+ case AF_INET: return ntohs(as<sockaddr_in>().sin_port);
+ case AF_INET6: return ntohs(as<sockaddr_in6>().sin6_port);
+ case AF_UNIX: return 0;
+ case AF_UNSPEC: return 0;
+ default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return 0;
+ }
+ }
+
+ string SockAddr::getAddr() const {
+ switch (getType()) {
+ case AF_INET:
+ case AF_INET6: {
+ const int buflen=128;
+ char buffer[buflen];
+ int ret = getnameinfo(raw(), addressSize, buffer, buflen, NULL, 0, NI_NUMERICHOST);
+ massert(13082, getAddrInfoStrError(ret), ret == 0);
+ return buffer;
+ }
+
+ case AF_UNIX: return (addressSize > 2 ? as<sockaddr_un>().sun_path : "anonymous unix socket");
+ case AF_UNSPEC: return "(NONE)";
+ default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return "";
+ }
+ }
+
+ bool SockAddr::operator==(const SockAddr& r) const {
+ if (getType() != r.getType())
+ return false;
+
+ if (getPort() != r.getPort())
+ return false;
+
+ switch (getType()) {
+ case AF_INET: return as<sockaddr_in>().sin_addr.s_addr == r.as<sockaddr_in>().sin_addr.s_addr;
+ case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) == 0;
+ case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) == 0;
+ case AF_UNSPEC: return true; // assume all unspecified addresses are the same
+ default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false);
+ }
+ return false;
+ }
+
+ bool SockAddr::operator!=(const SockAddr& r) const {
+ return !(*this == r);
+ }
+
+ bool SockAddr::operator<(const SockAddr& r) const {
+ if (getType() < r.getType())
+ return true;
+ else if (getType() > r.getType())
+ return false;
+
+ if (getPort() < r.getPort())
+ return true;
+ else if (getPort() > r.getPort())
+ return false;
+
+ switch (getType()) {
+ case AF_INET: return as<sockaddr_in>().sin_addr.s_addr < r.as<sockaddr_in>().sin_addr.s_addr;
+ case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) < 0;
+ case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) < 0;
+ case AF_UNSPEC: return false;
+ default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false);
+ }
+ return false;
+ }
+
+ SockAddr unknownAddress( "0.0.0.0", 0 );
+
+ // ------ hostname -------------------
+
+ string hostbyname(const char *hostname) {
+ string addr = SockAddr(hostname, 0).getAddr();
+ if (addr == "0.0.0.0")
+ return "";
+ else
+ return addr;
+ }
+
+ // --- my --
+
+ string getHostName() {
+ char buf[256];
+ int ec = gethostname(buf, 127);
+ if ( ec || *buf == 0 ) {
+ log() << "can't get this server's hostname " << errnoWithDescription() << endl;
+ return "";
+ }
+ return buf;
+ }
+
+
+ string _hostNameCached;
+ static void _hostNameCachedInit() {
+ _hostNameCached = getHostName();
+ }
+ boost::once_flag _hostNameCachedInitFlags = BOOST_ONCE_INIT;
+
+ string getHostNameCached() {
+ boost::call_once( _hostNameCachedInit , _hostNameCachedInitFlags );
+ return _hostNameCached;
+ }
+
+ // --------- SocketException ----------
+
+#ifdef MSG_NOSIGNAL
+ const int portSendFlags = MSG_NOSIGNAL;
+ const int portRecvFlags = MSG_NOSIGNAL;
+#else
+ const int portSendFlags = 0;
+ const int portRecvFlags = 0;
+#endif
+
+ string SocketException::toString() const {
+ stringstream ss;
+ ss << _ei.code << " socket exception [" << _type << "] ";
+
+ if ( _server.size() )
+ ss << "server [" << _server << "] ";
+
+ if ( _extra.size() )
+ ss << _extra;
+
+ return ss.str();
+ }
+
+
+ // ------------ SSLManager -----------------
+
+#ifdef MONGO_SSL
+ SSLManager::SSLManager( bool client ) {
+ _client = client;
+ SSL_library_init();
+ SSL_load_error_strings();
+ ERR_load_crypto_strings();
+
+ _context = SSL_CTX_new( client ? SSLv23_client_method() : SSLv23_server_method() );
+ massert( 15864 , mongoutils::str::stream() << "can't create SSL Context: " << ERR_error_string(ERR_get_error(), NULL) , _context );
+
+ SSL_CTX_set_options( _context, SSL_OP_ALL);
+ }
+
+ void SSLManager::setupPubPriv( const string& privateKeyFile , const string& publicKeyFile ) {
+ massert( 15865 ,
+ mongoutils::str::stream() << "Can't read SSL certificate from file "
+ << publicKeyFile << ":" << ERR_error_string(ERR_get_error(), NULL) ,
+ SSL_CTX_use_certificate_file(_context, publicKeyFile.c_str(), SSL_FILETYPE_PEM) );
+
+
+ massert( 15866 ,
+ mongoutils::str::stream() << "Can't read SSL private key from file "
+ << privateKeyFile << " : " << ERR_error_string(ERR_get_error(), NULL) ,
+ SSL_CTX_use_PrivateKey_file(_context, privateKeyFile.c_str(), SSL_FILETYPE_PEM) );
+ }
+
+
+ int SSLManager::password_cb(char *buf,int num, int rwflag,void *userdata){
+ SSLManager* sm = (SSLManager*)userdata;
+ string pass = sm->_password;
+ strcpy(buf,pass.c_str());
+ return(pass.size());
+ }
+
+ void SSLManager::setupPEM( const string& keyFile , const string& password ) {
+ _password = password;
+
+ massert( 15867 , "Can't read certificate file" , SSL_CTX_use_certificate_chain_file( _context , keyFile.c_str() ) );
+
+ SSL_CTX_set_default_passwd_cb_userdata( _context , this );
+ SSL_CTX_set_default_passwd_cb( _context, &SSLManager::password_cb );
+
+ massert( 15868 , "Can't read key file" , SSL_CTX_use_PrivateKey_file( _context , keyFile.c_str() , SSL_FILETYPE_PEM ) );
+ }
+
+ SSL * SSLManager::secure( int fd ) {
+ SSL * ssl = SSL_new( _context );
+ massert( 15861 , "can't create SSL" , ssl );
+ SSL_set_fd( ssl , fd );
+ return ssl;
+ }
+
+
+#endif
+
+ // ------------ Socket -----------------
+
+ Socket::Socket(int fd , const SockAddr& remote) :
+ _fd(fd), _remote(remote), _timeout(0) {
+ _logLevel = 0;
+ _init();
+ }
+
+ Socket::Socket( double timeout, int ll ) {
+ _logLevel = ll;
+ _fd = -1;
+ _timeout = timeout;
+ _init();
+ }
+
+ void Socket::_init() {
+ _bytesOut = 0;
+ _bytesIn = 0;
+#ifdef MONGO_SSL
+ _sslAccepted = 0;
+#endif
+ }
+
+ void Socket::close() {
+#ifdef MONGO_SSL
+ _ssl.reset();
+#endif
+ if ( _fd >= 0 ) {
+ closesocket( _fd );
+ _fd = -1;
+ }
+ }
+
+#ifdef MONGO_SSL
+ void Socket::secure( SSLManager * ssl ) {
+ assert( ssl );
+ assert( _fd >= 0 );
+ _ssl.reset( ssl->secure( _fd ) );
+ SSL_connect( _ssl.get() );
+ }
+
+ void Socket::secureAccepted( SSLManager * ssl ) {
+ _sslAccepted = ssl;
+ }
+#endif
+
+ void Socket::postFork() {
+#ifdef MONGO_SSL
+ if ( _sslAccepted ) {
+ assert( _fd );
+ _ssl.reset( _sslAccepted->secure( _fd ) );
+ SSL_accept( _ssl.get() );
+ _sslAccepted = 0;
+ }
+#endif
+ }
+
+ class ConnectBG : public BackgroundJob {
+ public:
+ ConnectBG(int sock, SockAddr remote) : _sock(sock), _remote(remote) { }
+
+ void run() { _res = ::connect(_sock, _remote.raw(), _remote.addressSize); }
+ string name() const { return "ConnectBG"; }
+ int inError() const { return _res; }
+
+ private:
+ int _sock;
+ int _res;
+ SockAddr _remote;
+ };
+
+ bool Socket::connect(SockAddr& remote) {
+ _remote = remote;
+
+ _fd = socket(remote.getType(), SOCK_STREAM, 0);
+ if ( _fd == INVALID_SOCKET ) {
+ log(_logLevel) << "ERROR: connect invalid socket " << errnoWithDescription() << endl;
+ return false;
+ }
+
+ if ( _timeout > 0 ) {
+ setTimeout( _timeout );
+ }
+
+ ConnectBG bg(_fd, remote);
+ bg.go();
+ if ( bg.wait(5000) ) {
+ if ( bg.inError() ) {
+ close();
+ return false;
+ }
+ }
+ else {
+ // time out the connect
+ close();
+ bg.wait(); // so bg stays in scope until bg thread terminates
+ return false;
+ }
+
+ if (remote.getType() != AF_UNIX)
+ disableNagle(_fd);
+
+#ifdef SO_NOSIGPIPE
+ // osx
+ const int one = 1;
+ setsockopt( _fd , SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int));
+#endif
+
+ return true;
+ }
+
+ int Socket::_send( const char * data , int len ) {
+#ifdef MONGO_SSL
+ if ( _ssl ) {
+ return SSL_write( _ssl.get() , data , len );
+ }
+#endif
+ return ::send( _fd , data , len , portSendFlags );
+ }
+
+ // sends all data or throws an exception
+ void Socket::send( const char * data , int len, const char *context ) {
+ while( len > 0 ) {
+ int ret = _send( data , len );
+ if ( ret == -1 ) {
+
+#ifdef MONGO_SSL
+ if ( _ssl ) {
+ log() << "SSL Error ret: " << ret << " err: " << SSL_get_error( _ssl.get() , ret )
+ << " " << ERR_error_string(ERR_get_error(), NULL)
+ << endl;
+ }
+#endif
+
+#if defined(_WIN32)
+ if ( WSAGetLastError() == WSAETIMEDOUT && _timeout != 0 ) {
+#else
+ if ( ( errno == EAGAIN || errno == EWOULDBLOCK ) && _timeout != 0 ) {
+#endif
+ log(_logLevel) << "Socket " << context << " send() timed out " << _remote.toString() << endl;
+ throw SocketException( SocketException::SEND_TIMEOUT , remoteString() );
+ }
+ else {
+ SocketException::Type t = SocketException::SEND_ERROR;
+ log(_logLevel) << "Socket " << context << " send() "
+ << errnoWithDescription() << ' ' << remoteString() << endl;
+ throw SocketException( t , remoteString() );
+ }
+ }
+ else {
+ _bytesOut += ret;
+
+ assert( ret <= len );
+ len -= ret;
+ data += ret;
+ }
+ }
+ }
+
+ void Socket::_send( const vector< pair< char *, int > > &data, const char *context ) {
+ for( vector< pair< char *, int > >::const_iterator i = data.begin(); i != data.end(); ++i ) {
+ char * data = i->first;
+ int len = i->second;
+ send( data, len, context );
+ }
+ }
+
+ // sends all data or throws an exception
+ void Socket::send( const vector< pair< char *, int > > &data, const char *context ) {
+
+#ifdef MONGO_SSL
+ if ( _ssl ) {
+ _send( data , context );
+ return;
+ }
+#endif
+
+#if defined(_WIN32)
+ // TODO use scatter/gather api
+ _send( data , context );
+#else
+ vector< struct iovec > d( data.size() );
+ int i = 0;
+ for( vector< pair< char *, int > >::const_iterator j = data.begin(); j != data.end(); ++j ) {
+ if ( j->second > 0 ) {
+ d[ i ].iov_base = j->first;
+ d[ i ].iov_len = j->second;
+ ++i;
+ _bytesOut += j->second;
+ }
+ }
+ struct msghdr meta;
+ memset( &meta, 0, sizeof( meta ) );
+ meta.msg_iov = &d[ 0 ];
+ meta.msg_iovlen = d.size();
+
+ while( meta.msg_iovlen > 0 ) {
+ int ret = ::sendmsg( _fd , &meta , portSendFlags );
+ if ( ret == -1 ) {
+ if ( errno != EAGAIN || _timeout == 0 ) {
+ log(_logLevel) << "Socket " << context << " send() " << errnoWithDescription() << ' ' << remoteString() << endl;
+ throw SocketException( SocketException::SEND_ERROR , remoteString() );
+ }
+ else {
+ log(_logLevel) << "Socket " << context << " send() remote timeout " << remoteString() << endl;
+ throw SocketException( SocketException::SEND_TIMEOUT , remoteString() );
+ }
+ }
+ else {
+ struct iovec *& i = meta.msg_iov;
+ while( ret > 0 ) {
+ if ( i->iov_len > unsigned( ret ) ) {
+ i->iov_len -= ret;
+ i->iov_base = (char*)(i->iov_base) + ret;
+ ret = 0;
+ }
+ else {
+ ret -= i->iov_len;
+ ++i;
+ --(meta.msg_iovlen);
+ }
+ }
+ }
+ }
+#endif
+ }
+
+ void Socket::recv( char * buf , int len ) {
+ unsigned retries = 0;
+ while( len > 0 ) {
+ int ret = unsafe_recv( buf , len );
+ if ( ret > 0 ) {
+ if ( len <= 4 && ret != len )
+ log(_logLevel) << "Socket recv() got " << ret << " bytes wanted len=" << len << endl;
+ assert( ret <= len );
+ len -= ret;
+ buf += ret;
+ }
+ else if ( ret == 0 ) {
+ log(3) << "Socket recv() conn closed? " << remoteString() << endl;
+ throw SocketException( SocketException::CLOSED , remoteString() );
+ }
+ else { /* ret < 0 */
+#if defined(_WIN32)
+ int e = WSAGetLastError();
+#else
+ int e = errno;
+# if defined(EINTR)
+ if( e == EINTR ) {
+ if( ++retries == 1 ) {
+ log() << "EINTR retry" << endl;
+ continue;
+ }
+ }
+# endif
+#endif
+ if ( ( e == EAGAIN
+#if defined(_WIN32)
+ || e == WSAETIMEDOUT
+#endif
+ ) && _timeout > 0 )
+ {
+ // this is a timeout
+ log(_logLevel) << "Socket recv() timeout " << remoteString() <<endl;
+ throw SocketException( SocketException::RECV_TIMEOUT, remoteString() );
+ }
+
+ log(_logLevel) << "Socket recv() " << errnoWithDescription(e) << " " << remoteString() <<endl;
+ throw SocketException( SocketException::RECV_ERROR , remoteString() );
+ }
+ }
+ }
+
+ int Socket::unsafe_recv( char *buf, int max ) {
+ int x = _recv( buf , max );
+ _bytesIn += x;
+ return x;
+ }
+
+
+ int Socket::_recv( char *buf, int max ) {
+#ifdef MONGO_SSL
+ if ( _ssl ){
+ return SSL_read( _ssl.get() , buf , max );
+ }
+#endif
+ return ::recv( _fd , buf , max , portRecvFlags );
+ }
+
+ void Socket::setTimeout( double secs ) {
+ struct timeval tv;
+ tv.tv_sec = (int)secs;
+ tv.tv_usec = (int)((long long)(secs*1000*1000) % (1000*1000));
+ bool report = logLevel > 3; // solaris doesn't provide these
+ DEV report = true;
+ bool ok = setsockopt(_fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv) ) == 0;
+ if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl;
+ ok = setsockopt(_fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof(tv) ) == 0;
+ DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl;
+ }
+
+#if defined(_WIN32)
+ struct WinsockInit {
+ WinsockInit() {
+ WSADATA d;
+ if ( WSAStartup(MAKEWORD(2,2), &d) != 0 ) {
+ out() << "ERROR: wsastartup failed " << errnoWithDescription() << endl;
+ problem() << "ERROR: wsastartup failed " << errnoWithDescription() << endl;
+ dbexit( EXIT_NTSERVICE_ERROR );
+ }
+ }
+ } winsock_init;
+#endif
+
+} // namespace mongo
diff --git a/util/net/sock.h b/util/net/sock.h
new file mode 100644
index 0000000..1cd5133
--- /dev/null
+++ b/util/net/sock.h
@@ -0,0 +1,256 @@
+// @file sock.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../../pch.h"
+
+#include <stdio.h>
+#include <sstream>
+#include "../goodies.h"
+#include "../../db/cmdline.h"
+#include "../mongoutils/str.h"
+
+#ifndef _WIN32
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <errno.h>
+
+#ifdef __openbsd__
+# include <sys/uio.h>
+#endif
+
+#endif // _WIN32
+
+#ifdef MONGO_SSL
+#include <openssl/ssl.h>
+#endif
+
+namespace mongo {
+
+ const int SOCK_FAMILY_UNKNOWN_ERROR=13078;
+
+ void disableNagle(int sock);
+
+#if defined(_WIN32)
+
+ typedef short sa_family_t;
+ typedef int socklen_t;
+
+ // This won't actually be used on windows
+ struct sockaddr_un {
+ short sun_family;
+ char sun_path[108]; // length from unix header
+ };
+
+#else // _WIN32
+
+ inline void closesocket(int s) { close(s); }
+ const int INVALID_SOCKET = -1;
+ typedef int SOCKET;
+
+#endif // _WIN32
+
+ inline string makeUnixSockPath(int port) {
+ return mongoutils::str::stream() << cmdLine.socket << "/mongodb-" << port << ".sock";
+ }
+
+ // If an ip address is passed in, just return that. If a hostname is passed
+ // in, look up its ip and return that. Returns "" on failure.
+ string hostbyname(const char *hostname);
+
+ void enableIPv6(bool state=true);
+ bool IPv6Enabled();
+ void setSockTimeouts(int sock, double secs);
+
+ /**
+ * wrapped around os representation of network address
+ */
+ struct SockAddr {
+ SockAddr() {
+ addressSize = sizeof(sa);
+ memset(&sa, 0, sizeof(sa));
+ sa.ss_family = AF_UNSPEC;
+ }
+ SockAddr(int sourcePort); /* listener side */
+ SockAddr(const char *ip, int port); /* EndPoint (remote) side, or if you want to specify which interface locally */
+
+ template <typename T> T& as() { return *(T*)(&sa); }
+ template <typename T> const T& as() const { return *(const T*)(&sa); }
+
+ string toString(bool includePort=true) const;
+
+ /**
+ * @return one of AF_INET, AF_INET6, or AF_UNIX
+ */
+ sa_family_t getType() const;
+
+ unsigned getPort() const;
+
+ string getAddr() const;
+
+ bool isLocalHost() const;
+
+ bool operator==(const SockAddr& r) const;
+
+ bool operator!=(const SockAddr& r) const;
+
+ bool operator<(const SockAddr& r) const;
+
+ const sockaddr* raw() const {return (sockaddr*)&sa;}
+ sockaddr* raw() {return (sockaddr*)&sa;}
+
+ socklen_t addressSize;
+ private:
+ struct sockaddr_storage sa;
+ };
+
+ extern SockAddr unknownAddress; // ( "0.0.0.0", 0 )
+
+ /** this is not cache and does a syscall */
+ string getHostName();
+
+ /** this is cached, so if changes during the process lifetime
+ * will be stale */
+ string getHostNameCached();
+
+ /**
+ * thrown by Socket and SockAddr
+ */
+ class SocketException : public DBException {
+ public:
+ const enum Type { CLOSED , RECV_ERROR , SEND_ERROR, RECV_TIMEOUT, SEND_TIMEOUT, FAILED_STATE, CONNECT_ERROR } _type;
+
+ SocketException( Type t , string server , int code = 9001 , string extra="" )
+ : DBException( "socket exception" , code ) , _type(t) , _server(server), _extra(extra){ }
+ virtual ~SocketException() throw() {}
+
+ bool shouldPrint() const { return _type != CLOSED; }
+ virtual string toString() const;
+
+ private:
+ string _server;
+ string _extra;
+ };
+
+#ifdef MONGO_SSL
+ class SSLManager : boost::noncopyable {
+ public:
+ SSLManager( bool client );
+
+ void setupPEM( const string& keyFile , const string& password );
+ void setupPubPriv( const string& privateKeyFile , const string& publicKeyFile );
+
+ /**
+ * creates an SSL context to be used for this file descriptor
+ * caller should delete
+ */
+ SSL * secure( int fd );
+
+ static int password_cb( char *buf,int num, int rwflag,void *userdata );
+
+ private:
+ bool _client;
+ SSL_CTX* _context;
+ string _password;
+ };
+#endif
+
+ /**
+ * thin wrapped around file descriptor and system calls
+ * todo: ssl
+ */
+ class Socket {
+ public:
+ Socket(int sock, const SockAddr& farEnd);
+
+ /** In some cases the timeout will actually be 2x this value - eg we do a partial send,
+ then the timeout fires, then we try to send again, then the timeout fires again with
+ no data sent, then we detect that the other side is down.
+
+ Generally you don't want a timeout, you should be very prepared for errors if you set one.
+ */
+ Socket(double so_timeout = 0, int logLevel = 0 );
+
+ bool connect(SockAddr& farEnd);
+ void close();
+
+ void send( const char * data , int len, const char *context );
+ void send( const vector< pair< char *, int > > &data, const char *context );
+
+ // recv len or throw SocketException
+ void recv( char * data , int len );
+ int unsafe_recv( char *buf, int max );
+
+ int getLogLevel() const { return _logLevel; }
+ void setLogLevel( int ll ) { _logLevel = ll; }
+
+ SockAddr remoteAddr() const { return _remote; }
+ string remoteString() const { return _remote.toString(); }
+ unsigned remotePort() const { return _remote.getPort(); }
+
+ void clearCounters() { _bytesIn = 0; _bytesOut = 0; }
+ long long getBytesIn() const { return _bytesIn; }
+ long long getBytesOut() const { return _bytesOut; }
+
+ void setTimeout( double secs );
+
+#ifdef MONGO_SSL
+ /** secures inline */
+ void secure( SSLManager * ssl );
+
+ void secureAccepted( SSLManager * ssl );
+#endif
+
+ /**
+ * call this after a fork for server sockets
+ */
+ void postFork();
+
+ private:
+ void _init();
+ /** raw send, same semantics as ::send */
+ int _send( const char * data , int len );
+
+ /** sends dumbly, just each buffer at a time */
+ void _send( const vector< pair< char *, int > > &data, const char *context );
+
+ /** raw recv, same semantics as ::recv */
+ int _recv( char * buf , int max );
+
+ int _fd;
+ SockAddr _remote;
+ double _timeout;
+
+ long long _bytesIn;
+ long long _bytesOut;
+
+#ifdef MONGO_SSL
+ shared_ptr<SSL> _ssl;
+ SSLManager * _sslAccepted;
+#endif
+
+ protected:
+ int _logLevel; // passed to log() when logging errors
+
+ };
+
+
+} // namespace mongo
diff --git a/util/optime.h b/util/optime.h
index 7e6be4d..9f78fda 100644
--- a/util/optime.h
+++ b/util/optime.h
@@ -26,7 +26,7 @@ namespace mongo {
ClockSkewException() : DBException( "clock skew exception" , 20001 ) {}
};
- /* replsets use RSOpTime.
+ /* replsets used to use RSOpTime.
M/S uses OpTime.
But this is useable from both.
*/
@@ -36,9 +36,10 @@ namespace mongo {
*/
#pragma pack(4)
class OpTime {
- unsigned i;
+ unsigned i; // ordinal comes first so we can do a single 64 bit compare on little endian
unsigned secs;
static OpTime last;
+ static OpTime skewed();
public:
static void setLast(const Date_t &date) {
last = OpTime(date);
@@ -46,47 +47,48 @@ namespace mongo {
unsigned getSecs() const {
return secs;
}
+ unsigned getInc() const {
+ return i;
+ }
OpTime(Date_t date) {
reinterpret_cast<unsigned long long&>(*this) = date.millis;
+ dassert( (int)secs >= 0 );
}
OpTime(ReplTime x) {
reinterpret_cast<unsigned long long&>(*this) = x;
+ dassert( (int)secs >= 0 );
}
OpTime(unsigned a, unsigned b) {
secs = a;
i = b;
+ dassert( (int)secs >= 0 );
}
OpTime( const OpTime& other ) {
secs = other.secs;
i = other.i;
+ dassert( (int)secs >= 0 );
}
OpTime() {
secs = 0;
i = 0;
}
- static OpTime now() {
+ // it isn't generally safe to not be locked for this. so use now(). some tests use this.
+ static OpTime now_inlock() {
unsigned t = (unsigned) time(0);
- if ( t < last.secs ) {
- bool toLog = false;
- ONCE toLog = true;
- RARELY toLog = true;
- if ( last.i & 0x80000000 )
- toLog = true;
- if ( toLog )
- log() << "clock skew detected prev: " << last.secs << " now: " << t << " trying to handle..." << endl;
- if ( last.i & 0x80000000 ) {
- log() << "ERROR Large clock skew detected, shutting down" << endl;
- throw ClockSkewException();
- }
- t = last.secs;
- }
if ( last.secs == t ) {
last.i++;
return last;
}
+ if ( t < last.secs ) {
+ return skewed(); // separate function to keep out of the hot code path
+ }
last = OpTime(t, 1);
return last;
}
+ static OpTime now() {
+ DEV dbMutex.assertWriteLocked();
+ return now_inlock();
+ }
/* We store OpTime's in the database as BSON Date datatype -- we needed some sort of
64 bit "container" for these values. While these are not really "Dates", that seems a
diff --git a/util/paths.h b/util/paths.h
index ce0a378..2297a9a 100644
--- a/util/paths.h
+++ b/util/paths.h
@@ -19,10 +19,13 @@
#pragma once
#include "mongoutils/str.h"
-
-using namespace mongoutils;
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
namespace mongo {
+
+ using namespace mongoutils;
extern string dbpath;
@@ -76,4 +79,39 @@ namespace mongo {
};
+ inline dev_t getPartition(const string& path){
+ struct stat stats;
+
+ if (stat(path.c_str(), &stats) != 0){
+ uasserted(13646, str::stream() << "stat() failed for file: " << path << " " << errnoWithDescription());
+ }
+
+ return stats.st_dev;
+ }
+
+ inline bool onSamePartition(const string& path1, const string& path2){
+ dev_t dev1 = getPartition(path1);
+ dev_t dev2 = getPartition(path2);
+
+ return dev1 == dev2;
+ }
+
+ inline void flushMyDirectory(const boost::filesystem::path& file){
+#ifdef __linux__ // this isn't needed elsewhere
+ massert(13652, str::stream() << "Couldn't find parent dir for file: " << file.string(), file.has_branch_path());
+ boost::filesystem::path dir = file.branch_path(); // parent_path in new boosts
+
+ log(1) << "flushing directory " << dir.string() << endl;
+
+ int fd = ::open(dir.string().c_str(), O_RDONLY); // DO NOT THROW OR ASSERT BEFORE CLOSING
+ massert(13650, str::stream() << "Couldn't open directory '" << dir.string() << "' for flushing: " << errnoWithDescription(), fd >= 0);
+ if (fsync(fd) != 0){
+ int e = errno;
+ close(fd);
+ massert(13651, str::stream() << "Couldn't fsync directory '" << dir.string() << "': " << errnoWithDescription(e), false);
+ }
+ close(fd);
+#endif
+ }
+
}
diff --git a/util/processinfo.h b/util/processinfo.h
index b10e6fe..5272831 100644
--- a/util/processinfo.h
+++ b/util/processinfo.h
@@ -53,8 +53,8 @@ namespace mongo {
bool supported();
- bool blockCheckSupported();
- bool blockInMemory( char * start );
+ static bool blockCheckSupported();
+ static bool blockInMemory( char * start );
private:
pid_t _pid;
@@ -62,6 +62,6 @@ namespace mongo {
void writePidFile( const std::string& path );
- void printMemInfo( const char * where );
+ void printMemInfo( const char * whereContextStr = 0 );
}
diff --git a/util/processinfo_darwin.cpp b/util/processinfo_darwin.cpp
index c1190ae..9f73cbf 100644
--- a/util/processinfo_darwin.cpp
+++ b/util/processinfo_darwin.cpp
@@ -19,15 +19,14 @@
#include "processinfo.h"
#include "log.h"
-
+#include <mach/vm_statistics.h>
#include <mach/task_info.h>
-
#include <mach/mach_init.h>
#include <mach/mach_host.h>
#include <mach/mach_traps.h>
#include <mach/task.h>
#include <mach/vm_map.h>
-#include <mach/shared_memory_server.h>
+#include <mach/shared_region.h>
#include <iostream>
#include <sys/types.h>
diff --git a/util/processinfo_win32.cpp b/util/processinfo_win32.cpp
index d62b21b..ec66aec 100644
--- a/util/processinfo_win32.cpp
+++ b/util/processinfo_win32.cpp
@@ -17,10 +17,7 @@
#include "pch.h"
#include "processinfo.h"
-
#include <iostream>
-
-#include <windows.h>
#include <psapi.h>
using namespace std;
diff --git a/util/queue.h b/util/queue.h
index 6a1e33a..4223bd6 100644
--- a/util/queue.h
+++ b/util/queue.h
@@ -43,6 +43,12 @@ namespace mongo {
return _queue.empty();
}
+ size_t size() const {
+ scoped_lock l( _lock );
+ return _queue.size();
+ }
+
+
bool tryPop( T & t ) {
scoped_lock l( _lock );
if ( _queue.empty() )
diff --git a/util/ramlog.cpp b/util/ramlog.cpp
new file mode 100644
index 0000000..69ffc17
--- /dev/null
+++ b/util/ramlog.cpp
@@ -0,0 +1,190 @@
+// ramlog.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "log.h"
+#include "ramlog.h"
+#include "mongoutils/html.h"
+#include "mongoutils/str.h"
+
+namespace mongo {
+
+ using namespace mongoutils;
+
+ RamLog::RamLog( string name ) : _name(name), _lastWrite(0) {
+ h = 0; n = 0;
+ for( int i = 0; i < N; i++ )
+ lines[i][C-1] = 0;
+
+ if ( name.size() ) {
+
+ if ( ! _namedLock )
+ _namedLock = new mongo::mutex("RamLog::_namedLock");
+
+ scoped_lock lk( *_namedLock );
+ if ( ! _named )
+ _named = new RM();
+ (*_named)[name] = this;
+ }
+
+ }
+
+ RamLog::~RamLog() {
+
+ }
+
+ void RamLog::write(LogLevel ll, const string& str) {
+ _lastWrite = time(0);
+
+ char *p = lines[(h+n)%N];
+
+ unsigned sz = str.size();
+ if( sz < C ) {
+ if ( str.c_str()[sz-1] == '\n' ) {
+ memcpy(p, str.c_str(), sz-1);
+ p[sz-1] = 0;
+ }
+ else
+ strcpy(p, str.c_str());
+ }
+ else {
+ memcpy(p, str.c_str(), C-1);
+ }
+
+ if( n < N ) n++;
+ else h = (h+1) % N;
+ }
+
+ void RamLog::get( vector<const char*>& v) const {
+ for( unsigned x=0, i=h; x++ < n; i=(i+1)%N )
+ v.push_back(lines[i]);
+ }
+
+ int RamLog::repeats(const vector<const char *>& v, int i) {
+ for( int j = i-1; j >= 0 && j+8 > i; j-- ) {
+ if( strcmp(v[i]+20,v[j]+20) == 0 ) {
+ for( int x = 1; ; x++ ) {
+ if( j+x == i ) return j;
+ if( i+x>=(int) v.size() ) return -1;
+ if( strcmp(v[i+x]+20,v[j+x]+20) ) return -1;
+ }
+ return -1;
+ }
+ }
+ return -1;
+ }
+
+
+ string RamLog::clean(const vector<const char *>& v, int i, string line ) {
+ if( line.empty() ) line = v[i];
+ if( i > 0 && strncmp(v[i], v[i-1], 11) == 0 )
+ return string(" ") + line.substr(11);
+ return v[i];
+ }
+
+ string RamLog::color(string line) {
+ string s = str::after(line, "replSet ");
+ if( str::startsWith(s, "warning") || startsWith(s, "error") )
+ return html::red(line);
+ if( str::startsWith(s, "info") ) {
+ if( str::endsWith(s, " up\n") )
+ return html::green(line);
+ else if( str::contains(s, " down ") || str::endsWith(s, " down\n") )
+ return html::yellow(line);
+ return line; //html::blue(line);
+ }
+
+ return line;
+ }
+
+ /* turn http:... into an anchor */
+ string RamLog::linkify(const char *s) {
+ const char *p = s;
+ const char *h = strstr(p, "http://");
+ if( h == 0 ) return s;
+
+ const char *sp = h + 7;
+ while( *sp && *sp != ' ' ) sp++;
+
+ string url(h, sp-h);
+ stringstream ss;
+ ss << string(s, h-s) << "<a href=\"" << url << "\">" << url << "</a>" << sp;
+ return ss.str();
+ }
+
+ void RamLog::toHTML(stringstream& s) {
+ vector<const char*> v;
+ get( v );
+
+ s << "<pre>\n";
+ for( int i = 0; i < (int)v.size(); i++ ) {
+ assert( strlen(v[i]) > 20 );
+ int r = repeats(v, i);
+ if( r < 0 ) {
+ s << color( linkify( clean(v,i).c_str() ) );
+ }
+ else {
+ stringstream x;
+ x << string(v[i], 0, 20);
+ int nr = (i-r);
+ int last = i+nr-1;
+ for( ; r < i ; r++ ) x << '.';
+ if( 1 ) {
+ stringstream r;
+ if( nr == 1 ) r << "repeat last line";
+ else r << "repeats last " << nr << " lines; ends " << string(v[last]+4,0,15);
+ s << html::a("", r.str(), clean(v,i,x.str()));
+ }
+ else s << x.str();
+ s << '\n';
+ i = last;
+ }
+ }
+ s << "</pre>\n";
+ }
+
+ // ---------------
+ // static things
+ // ---------------
+
+ RamLog* RamLog::get( string name ) {
+ if ( ! _named )
+ return 0;
+
+ scoped_lock lk( *_namedLock );
+ RM::iterator i = _named->find( name );
+ if ( i == _named->end() )
+ return 0;
+ return i->second;
+ }
+
+ void RamLog::getNames( vector<string>& names ) {
+ if ( ! _named )
+ return;
+
+ scoped_lock lk( *_namedLock );
+ for ( RM::iterator i=_named->begin(); i!=_named->end(); ++i ) {
+ if ( i->second->n )
+ names.push_back( i->first );
+ }
+ }
+
+ mongo::mutex* RamLog::_namedLock;
+ RamLog::RM* RamLog::_named = 0;
+
+ Tee* const warnings = new RamLog("warnings"); // Things put here go in serverStatus
+}
diff --git a/util/ramlog.h b/util/ramlog.h
index b2f3aa0..d3d5c8f 100644
--- a/util/ramlog.h
+++ b/util/ramlog.h
@@ -1,4 +1,4 @@
-// log.h
+// ramlog.h
/* Copyright 2009 10gen Inc.
*
@@ -18,124 +18,48 @@
#pragma once
#include "log.h"
-#include "mongoutils/html.h"
namespace mongo {
class RamLog : public Tee {
- enum {
- N = 128,
- C = 256
- };
- char lines[N][C];
- unsigned h, n;
-
public:
- RamLog() {
- h = 0; n = 0;
- for( int i = 0; i < N; i++ )
- lines[i][C-1] = 0;
- }
-
- virtual void write(LogLevel ll, const string& str) {
- char *p = lines[(h+n)%N];
- if( str.size() < C )
- strcpy(p, str.c_str());
- else
- memcpy(p, str.c_str(), C-1);
- if( n < N ) n++;
- else h = (h+1) % N;
- }
-
- void get( vector<const char*>& v) const {
- for( unsigned x=0, i=h; x++ < n; i=(i+1)%N )
- v.push_back(lines[i]);
- }
-
- static int repeats(const vector<const char *>& v, int i) {
- for( int j = i-1; j >= 0 && j+8 > i; j-- ) {
- if( strcmp(v[i]+20,v[j]+20) == 0 ) {
- for( int x = 1; ; x++ ) {
- if( j+x == i ) return j;
- if( i+x>=(int) v.size() ) return -1;
- if( strcmp(v[i+x]+20,v[j+x]+20) ) return -1;
- }
- return -1;
- }
- }
- return -1;
- }
-
-
- static string clean(const vector<const char *>& v, int i, string line="") {
- if( line.empty() ) line = v[i];
- if( i > 0 && strncmp(v[i], v[i-1], 11) == 0 )
- return string(" ") + line.substr(11);
- return v[i];
- }
-
- static string color(string line) {
- string s = str::after(line, "replSet ");
- if( str::startsWith(s, "warning") || startsWith(s, "error") )
- return html::red(line);
- if( str::startsWith(s, "info") ) {
- if( str::endsWith(s, " up\n") )
- return html::green(line);
- else if( str::contains(s, " down ") || str::endsWith(s, " down\n") )
- return html::yellow(line);
- return line; //html::blue(line);
- }
-
- return line;
- }
+ RamLog( string name );
+
+ virtual void write(LogLevel ll, const string& str);
+
+ void get( vector<const char*>& v) const;
+
+ void toHTML(stringstream& s);
+
+ static RamLog* get( string name );
+ static void getNames( vector<string>& names );
+
+ time_t lastWrite() { return _lastWrite; } // 0 if no writes
+
+ protected:
+ static int repeats(const vector<const char *>& v, int i);
+ static string clean(const vector<const char *>& v, int i, string line="");
+ static string color(string line);
/* turn http:... into an anchor */
- string linkify(const char *s) {
- const char *p = s;
- const char *h = strstr(p, "http://");
- if( h == 0 ) return s;
-
- const char *sp = h + 7;
- while( *sp && *sp != ' ' ) sp++;
-
- string url(h, sp-h);
- stringstream ss;
- ss << string(s, h-s) << "<a href=\"" << url << "\">" << url << "</a>" << sp;
- return ss.str();
- }
-
- void toHTML(stringstream& s) {
- vector<const char*> v;
- get( v );
-
- s << "<pre>\n";
- for( int i = 0; i < (int)v.size(); i++ ) {
- assert( strlen(v[i]) > 20 );
- int r = repeats(v, i);
- if( r < 0 ) {
- s << color( linkify( clean(v,i).c_str() ) );
- }
- else {
- stringstream x;
- x << string(v[i], 0, 20);
- int nr = (i-r);
- int last = i+nr-1;
- for( ; r < i ; r++ ) x << '.';
- if( 1 ) {
- stringstream r;
- if( nr == 1 ) r << "repeat last line";
- else r << "repeats last " << nr << " lines; ends " << string(v[last]+4,0,15);
- s << html::a("", r.str(), clean(v,i,x.str()));
- }
- else s << x.str();
- s << '\n';
- i = last;
- }
- }
- s << "</pre>\n";
- }
+ static string linkify(const char *s);
+ private:
+ ~RamLog(); // want this private as we want to leak so we can use them till the very end
+ enum {
+ N = 128, // number of links
+ C = 256 // max size of line
+ };
+ char lines[N][C];
+ unsigned h; // current position
+ unsigned n; // numer of lines stores 0 o N
+ string _name;
+
+ typedef map<string,RamLog*> RM;
+ static mongo::mutex* _namedLock;
+ static RM* _named;
+ time_t _lastWrite;
};
}
diff --git a/util/sock.cpp b/util/sock.cpp
deleted file mode 100644
index ef3ed0e..0000000
--- a/util/sock.cpp
+++ /dev/null
@@ -1,235 +0,0 @@
-// @file sock.cpp
-
-/* Copyright 2009 10gen Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "pch.h"
-#include "sock.h"
-
-namespace mongo {
-
- static mongo::mutex sock_mutex("sock_mutex");
-
- static bool ipv6 = false;
- void enableIPv6(bool state) { ipv6 = state; }
- bool IPv6Enabled() { return ipv6; }
-
- string getAddrInfoStrError(int code) {
-#if !defined(_WIN32)
- return gai_strerror(code);
-#else
- /* gai_strerrorA is not threadsafe on windows. don't use it. */
- return errnoWithDescription(code);
-#endif
- }
-
- SockAddr::SockAddr(int sourcePort) {
- memset(as<sockaddr_in>().sin_zero, 0, sizeof(as<sockaddr_in>().sin_zero));
- as<sockaddr_in>().sin_family = AF_INET;
- as<sockaddr_in>().sin_port = htons(sourcePort);
- as<sockaddr_in>().sin_addr.s_addr = htonl(INADDR_ANY);
- addressSize = sizeof(sockaddr_in);
- }
-
- SockAddr::SockAddr(const char * iporhost , int port) {
- if (!strcmp(iporhost, "localhost"))
- iporhost = "127.0.0.1";
-
- if (strchr(iporhost, '/')) {
-#ifdef _WIN32
- uassert(13080, "no unix socket support on windows", false);
-#endif
- uassert(13079, "path to unix socket too long", strlen(iporhost) < sizeof(as<sockaddr_un>().sun_path));
- as<sockaddr_un>().sun_family = AF_UNIX;
- strcpy(as<sockaddr_un>().sun_path, iporhost);
- addressSize = sizeof(sockaddr_un);
- }
- else {
- addrinfo* addrs = NULL;
- addrinfo hints;
- memset(&hints, 0, sizeof(addrinfo));
- hints.ai_socktype = SOCK_STREAM;
- //hints.ai_flags = AI_ADDRCONFIG; // This is often recommended but don't do it. SERVER-1579
- hints.ai_flags |= AI_NUMERICHOST; // first pass tries w/o DNS lookup
- hints.ai_family = (IPv6Enabled() ? AF_UNSPEC : AF_INET);
-
- stringstream ss;
- ss << port;
- int ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs);
-
- // old C compilers on IPv6-capable hosts return EAI_NODATA error
-#ifdef EAI_NODATA
- int nodata = (ret == EAI_NODATA);
-#else
- int nodata = false;
-#endif
- if (ret == EAI_NONAME || nodata) {
- // iporhost isn't an IP address, allow DNS lookup
- hints.ai_flags &= ~AI_NUMERICHOST;
- ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs);
- }
-
- if (ret) {
- log() << "getaddrinfo(\"" << iporhost << "\") failed: " << gai_strerror(ret) << endl;
- *this = SockAddr(port);
- }
- else {
- //TODO: handle other addresses in linked list;
- assert(addrs->ai_addrlen <= sizeof(sa));
- memcpy(&sa, addrs->ai_addr, addrs->ai_addrlen);
- addressSize = addrs->ai_addrlen;
- freeaddrinfo(addrs);
- }
- }
- }
-
- bool SockAddr::isLocalHost() const {
- switch (getType()) {
- case AF_INET: return getAddr() == "127.0.0.1";
- case AF_INET6: return getAddr() == "::1";
- case AF_UNIX: return true;
- default: return false;
- }
- assert(false);
- return false;
- }
-
- string hostbyname(const char *hostname) {
- string addr = SockAddr(hostname, 0).getAddr();
- if (addr == "0.0.0.0")
- return "";
- else
- return addr;
- }
-
- class UDPConnection {
- public:
- UDPConnection() {
- sock = 0;
- }
- ~UDPConnection() {
- if ( sock ) {
- closesocket(sock);
- sock = 0;
- }
- }
- bool init(const SockAddr& myAddr);
- int recvfrom(char *buf, int len, SockAddr& sender);
- int sendto(char *buf, int len, const SockAddr& EndPoint);
- int mtu(const SockAddr& sa) {
- return sa.isLocalHost() ? 16384 : 1480;
- }
-
- SOCKET sock;
- };
-
- inline int UDPConnection::recvfrom(char *buf, int len, SockAddr& sender) {
- return ::recvfrom(sock, buf, len, 0, sender.raw(), &sender.addressSize);
- }
-
- inline int UDPConnection::sendto(char *buf, int len, const SockAddr& EndPoint) {
- if ( 0 && rand() < (RAND_MAX>>4) ) {
- out() << " NOTSENT ";
- return 0;
- }
- return ::sendto(sock, buf, len, 0, EndPoint.raw(), EndPoint.addressSize);
- }
-
- inline bool UDPConnection::init(const SockAddr& myAddr) {
- sock = socket(myAddr.getType(), SOCK_DGRAM, IPPROTO_UDP);
- if ( sock == INVALID_SOCKET ) {
- out() << "invalid socket? " << errnoWithDescription() << endl;
- return false;
- }
- if ( ::bind(sock, myAddr.raw(), myAddr.addressSize) != 0 ) {
- out() << "udp init failed" << endl;
- closesocket(sock);
- sock = 0;
- return false;
- }
- socklen_t optLen;
- int rcvbuf;
- if (getsockopt(sock,
- SOL_SOCKET,
- SO_RCVBUF,
- (char*)&rcvbuf,
- &optLen) != -1)
- out() << "SO_RCVBUF:" << rcvbuf << endl;
- return true;
- }
-
- void sendtest() {
- out() << "sendtest\n";
- SockAddr me(27016);
- SockAddr dest("127.0.0.1", 27015);
- UDPConnection c;
- if ( c.init(me) ) {
- char buf[256];
- out() << "sendto: ";
- out() << c.sendto(buf, sizeof(buf), dest) << " " << errnoWithDescription() << endl;
- }
- out() << "end\n";
- }
-
- void listentest() {
- out() << "listentest\n";
- SockAddr me(27015);
- SockAddr sender;
- UDPConnection c;
- if ( c.init(me) ) {
- char buf[256];
- out() << "recvfrom: ";
- out() << c.recvfrom(buf, sizeof(buf), sender) << " " << errnoWithDescription() << endl;
- }
- out() << "end listentest\n";
- }
-
- void xmain();
-
-#if defined(_WIN32)
- namespace {
- struct WinsockInit {
- WinsockInit() {
- WSADATA d;
- if ( WSAStartup(MAKEWORD(2,2), &d) != 0 ) {
- out() << "ERROR: wsastartup failed " << errnoWithDescription() << endl;
- problem() << "ERROR: wsastartup failed " << errnoWithDescription() << endl;
- dbexit( EXIT_NTSERVICE_ERROR );
- }
- }
- } winsock_init;
- }
-#endif
-
- SockAddr unknownAddress( "0.0.0.0", 0 );
-
- ListeningSockets* ListeningSockets::_instance = new ListeningSockets();
-
- ListeningSockets* ListeningSockets::get() {
- return _instance;
- }
-
- string _hostNameCached;
- static void _hostNameCachedInit() {
- _hostNameCached = getHostName();
- }
- boost::once_flag _hostNameCachedInitFlags = BOOST_ONCE_INIT;
-
- string getHostNameCached() {
- boost::call_once( _hostNameCachedInit , _hostNameCachedInitFlags );
- return _hostNameCached;
- }
-
-} // namespace mongo
diff --git a/util/sock.h b/util/sock.h
deleted file mode 100644
index 54dfb49..0000000
--- a/util/sock.h
+++ /dev/null
@@ -1,303 +0,0 @@
-// @file sock.h
-
-/* Copyright 2009 10gen Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include "../pch.h"
-
-#include <stdio.h>
-#include <sstream>
-#include "goodies.h"
-#include "../db/jsobj.h"
-#include "../db/cmdline.h"
-
-namespace mongo {
-
- const int SOCK_FAMILY_UNKNOWN_ERROR=13078;
- string getAddrInfoStrError(int code);
-
-#if defined(_WIN32)
-
- typedef short sa_family_t;
- typedef int socklen_t;
- inline int getLastError() { return WSAGetLastError(); }
- inline void disableNagle(int sock) {
- int x = 1;
- if ( setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &x, sizeof(x)) )
- out() << "ERROR: disableNagle failed" << endl;
- if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) )
- out() << "ERROR: SO_KEEPALIVE failed" << endl;
- }
- inline void prebindOptions( int sock ) { }
-
- // This won't actually be used on windows
- struct sockaddr_un {
- short sun_family;
- char sun_path[108]; // length from unix header
- };
-
-#else
-
- extern CmdLine cmdLine;
-
-} // namespace mongo
-
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <arpa/inet.h>
-#include <errno.h>
-#include <netdb.h>
-#ifdef __openbsd__
-# include <sys/uio.h>
-#endif
-
-#ifndef AI_ADDRCONFIG
-# define AI_ADDRCONFIG 0
-#endif
-
-namespace mongo {
-
- inline void closesocket(int s) {
- close(s);
- }
- const int INVALID_SOCKET = -1;
- typedef int SOCKET;
-
- inline void disableNagle(int sock) {
- int x = 1;
-
-#ifdef SOL_TCP
- int level = SOL_TCP;
-#else
- int level = SOL_SOCKET;
-#endif
-
- if ( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) )
- log() << "ERROR: disableNagle failed: " << errnoWithDescription() << endl;
-
-#ifdef SO_KEEPALIVE
- if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) )
- log() << "ERROR: SO_KEEPALIVE failed: " << errnoWithDescription() << endl;
-#endif
-
- }
- inline void prebindOptions( int sock ) {
- DEV log() << "doing prebind option" << endl;
- int x = 1;
- if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &x, sizeof(x)) < 0 )
- out() << "Failed to set socket opt, SO_REUSEADDR" << endl;
- }
-
-
-#endif
-
- inline string makeUnixSockPath(int port) {
- return cmdLine.socket + "/mongodb-" + BSONObjBuilder::numStr(port) + ".sock";
- }
-
- inline void setSockTimeouts(int sock, double secs) {
- struct timeval tv;
- tv.tv_sec = (int)secs;
- tv.tv_usec = (int)((long long)(secs*1000*1000) % (1000*1000));
- bool report = logLevel > 3; // solaris doesn't provide these
- DEV report = true;
- bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv) ) == 0;
- if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl;
- ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof(tv) ) == 0;
- DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl;
- }
-
- // If an ip address is passed in, just return that. If a hostname is passed
- // in, look up its ip and return that. Returns "" on failure.
- string hostbyname(const char *hostname);
-
- void enableIPv6(bool state=true);
- bool IPv6Enabled();
-
- struct SockAddr {
- SockAddr() {
- addressSize = sizeof(sa);
- memset(&sa, 0, sizeof(sa));
- sa.ss_family = AF_UNSPEC;
- }
- SockAddr(int sourcePort); /* listener side */
- SockAddr(const char *ip, int port); /* EndPoint (remote) side, or if you want to specify which interface locally */
-
- template <typename T>
- T& as() { return *(T*)(&sa); }
- template <typename T>
- const T& as() const { return *(const T*)(&sa); }
-
- string toString(bool includePort=true) const {
- string out = getAddr();
- if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC)
- out += ':' + BSONObjBuilder::numStr(getPort());
- return out;
- }
-
- // returns one of AF_INET, AF_INET6, or AF_UNIX
- sa_family_t getType() const {
- return sa.ss_family;
- }
-
- unsigned getPort() const {
- switch (getType()) {
- case AF_INET: return ntohs(as<sockaddr_in>().sin_port);
- case AF_INET6: return ntohs(as<sockaddr_in6>().sin6_port);
- case AF_UNIX: return 0;
- case AF_UNSPEC: return 0;
- default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return 0;
- }
- }
-
- string getAddr() const {
- switch (getType()) {
- case AF_INET:
- case AF_INET6: {
- const int buflen=128;
- char buffer[buflen];
- int ret = getnameinfo(raw(), addressSize, buffer, buflen, NULL, 0, NI_NUMERICHOST);
- massert(13082, getAddrInfoStrError(ret), ret == 0);
- return buffer;
- }
-
- case AF_UNIX: return (addressSize > 2 ? as<sockaddr_un>().sun_path : "anonymous unix socket");
- case AF_UNSPEC: return "(NONE)";
- default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return "";
- }
- }
-
- bool isLocalHost() const;
-
- bool operator==(const SockAddr& r) const {
- if (getType() != r.getType())
- return false;
-
- if (getPort() != r.getPort())
- return false;
-
- switch (getType()) {
- case AF_INET: return as<sockaddr_in>().sin_addr.s_addr == r.as<sockaddr_in>().sin_addr.s_addr;
- case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) == 0;
- case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) == 0;
- case AF_UNSPEC: return true; // assume all unspecified addresses are the same
- default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return false;
- }
- }
- bool operator!=(const SockAddr& r) const {
- return !(*this == r);
- }
- bool operator<(const SockAddr& r) const {
- if (getType() < r.getType())
- return true;
- else if (getType() > r.getType())
- return false;
-
- if (getPort() < r.getPort())
- return true;
- else if (getPort() > r.getPort())
- return false;
-
- switch (getType()) {
- case AF_INET: return as<sockaddr_in>().sin_addr.s_addr < r.as<sockaddr_in>().sin_addr.s_addr;
- case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) < 0;
- case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) < 0;
- case AF_UNSPEC: return false;
- default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return false;
- }
- }
-
- const sockaddr* raw() const {return (sockaddr*)&sa;}
- sockaddr* raw() {return (sockaddr*)&sa;}
-
- socklen_t addressSize;
- private:
- struct sockaddr_storage sa;
- };
-
- extern SockAddr unknownAddress; // ( "0.0.0.0", 0 )
-
- const int MaxMTU = 16384;
-
- inline string getHostName() {
- char buf[256];
- int ec = gethostname(buf, 127);
- if ( ec || *buf == 0 ) {
- log() << "can't get this server's hostname " << errnoWithDescription() << endl;
- return "";
- }
- return buf;
- }
-
- string getHostNameCached();
-
- class ListeningSockets {
- public:
- ListeningSockets()
- : _mutex("ListeningSockets")
- , _sockets( new set<int>() )
- , _socketPaths( new set<string>() )
- { }
- void add( int sock ) {
- scoped_lock lk( _mutex );
- _sockets->insert( sock );
- }
- void addPath( string path ) {
- scoped_lock lk( _mutex );
- _socketPaths->insert( path );
- }
- void remove( int sock ) {
- scoped_lock lk( _mutex );
- _sockets->erase( sock );
- }
- void closeAll() {
- set<int>* sockets;
- set<string>* paths;
-
- {
- scoped_lock lk( _mutex );
- sockets = _sockets;
- _sockets = new set<int>();
- paths = _socketPaths;
- _socketPaths = new set<string>();
- }
-
- for ( set<int>::iterator i=sockets->begin(); i!=sockets->end(); i++ ) {
- int sock = *i;
- log() << "closing listening socket: " << sock << endl;
- closesocket( sock );
- }
-
- for ( set<string>::iterator i=paths->begin(); i!=paths->end(); i++ ) {
- string path = *i;
- log() << "removing socket file: " << path << endl;
- ::remove( path.c_str() );
- }
- }
- static ListeningSockets* get();
- private:
- mongo::mutex _mutex;
- set<int>* _sockets;
- set<string>* _socketPaths; // for unix domain sockets
- static ListeningSockets* _instance;
- };
-
-} // namespace mongo
diff --git a/util/stringutils.h b/util/stringutils.h
index 60571e6..93598aa 100644
--- a/util/stringutils.h
+++ b/util/stringutils.h
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-#ifndef UTIL_STRING_UTILS_HEADER
-#define UTIL_STRING_UTILS_HEADER
+#pragma once
namespace mongo {
// see also mongoutils/str.h - perhaps move these there?
+ // see also text.h
void splitStringDelim( const string& str , vector<string>* res , char delim );
@@ -40,6 +40,100 @@ namespace mongo {
return string(copy);
}
-} // namespace mongo
+ /**
+ * Non numeric characters are compared lexicographically; numeric substrings
+ * are compared numerically; dots separate ordered comparable subunits.
+ * For convenience, character 255 is greater than anything else.
+ */
+ inline int lexNumCmp( const char *s1, const char *s2 ) {
+ //cout << "START : " << s1 << "\t" << s2 << endl;
+
+ bool startWord = true;
+
+ while( *s1 && *s2 ) {
+
+ bool d1 = ( *s1 == '.' );
+ bool d2 = ( *s2 == '.' );
+ if ( d1 && !d2 )
+ return -1;
+ if ( d2 && !d1 )
+ return 1;
+ if ( d1 && d2 ) {
+ ++s1; ++s2;
+ startWord = true;
+ continue;
+ }
+
+ bool p1 = ( *s1 == (char)255 );
+ bool p2 = ( *s2 == (char)255 );
+ //cout << "\t\t " << p1 << "\t" << p2 << endl;
+ if ( p1 && !p2 )
+ return 1;
+ if ( p2 && !p1 )
+ return -1;
+
+ bool n1 = isNumber( *s1 );
+ bool n2 = isNumber( *s2 );
+
+ if ( n1 && n2 ) {
+ // get rid of leading 0s
+ if ( startWord ) {
+ while ( *s1 == '0' ) s1++;
+ while ( *s2 == '0' ) s2++;
+ }
+
+ char * e1 = (char*)s1;
+ char * e2 = (char*)s2;
+
+ // find length
+ // if end of string, will break immediately ('\0')
+ while ( isNumber (*e1) ) e1++;
+ while ( isNumber (*e2) ) e2++;
+
+ int len1 = (int)(e1-s1);
+ int len2 = (int)(e2-s2);
+
+ int result;
+ // if one is longer than the other, return
+ if ( len1 > len2 ) {
+ return 1;
+ }
+ else if ( len2 > len1 ) {
+ return -1;
+ }
+ // if the lengths are equal, just strcmp
+ else if ( (result = strncmp(s1, s2, len1)) != 0 ) {
+ return result;
+ }
-#endif // UTIL_STRING_UTILS_HEADER
+ // otherwise, the numbers are equal
+ s1 = e1;
+ s2 = e2;
+ startWord = false;
+ continue;
+ }
+
+ if ( n1 )
+ return 1;
+
+ if ( n2 )
+ return -1;
+
+ if ( *s1 > *s2 )
+ return 1;
+
+ if ( *s2 > *s1 )
+ return -1;
+
+ s1++; s2++;
+ startWord = false;
+ }
+
+ if ( *s1 )
+ return 1;
+ if ( *s2 )
+ return -1;
+ return 0;
+ }
+
+} // namespace mongo
diff --git a/util/time_support.h b/util/time_support.h
index 5dedec9..ca17807 100644
--- a/util/time_support.h
+++ b/util/time_support.h
@@ -52,6 +52,16 @@ namespace mongo {
return buf;
}
+ inline string timeToISOString(time_t time) {
+ struct tm t;
+ time_t_to_Struct( time, &t );
+
+ const char* fmt = "%Y-%m-%dT%H:%M:%SZ";
+ char buf[32];
+ assert(strftime(buf, sizeof(buf), fmt, &t) == 20);
+ return buf;
+ }
+
inline boost::gregorian::date currentDate() {
boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
return now.date();
@@ -161,35 +171,52 @@ namespace mongo {
}
#endif
- // note this wraps
- inline int tdiff(unsigned told, unsigned tnew) {
- return WrappingInt::diff(tnew, told);
+ extern long long jsTime_virtual_skew;
+ extern boost::thread_specific_ptr<long long> jsTime_virtual_thread_skew;
+
+ // DO NOT TOUCH except for testing
+ inline void jsTimeVirtualSkew( long long skew ){
+ jsTime_virtual_skew = skew;
+ }
+ inline long long getJSTimeVirtualSkew(){
+ return jsTime_virtual_skew;
}
- /** curTimeMillis will overflow - use curTimeMicros64 instead if you care about that. */
- inline unsigned curTimeMillis() {
- boost::xtime xt;
- boost::xtime_get(&xt, boost::TIME_UTC);
- unsigned t = xt.nsec / 1000000;
- return (xt.sec & 0xfffff) * 1000 + t;
+ inline void jsTimeVirtualThreadSkew( long long skew ){
+ jsTime_virtual_thread_skew.reset(new long long(skew));
+ }
+ inline long long getJSTimeVirtualThreadSkew(){
+ if(jsTime_virtual_thread_skew.get()){
+ return *(jsTime_virtual_thread_skew.get());
+ }
+ else return 0;
}
/** Date_t is milliseconds since epoch */
+ inline Date_t jsTime();
+
+ /** warning this will wrap */
+ inline unsigned curTimeMicros();
+
+ inline unsigned long long curTimeMicros64();
+#ifdef _WIN32 // no gettimeofday on windows
+ inline unsigned long long curTimeMillis64() {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ return ((unsigned long long)xt.sec) * 1000 + xt.nsec / 1000000;
+ }
inline Date_t jsTime() {
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
unsigned long long t = xt.nsec / 1000000;
- return ((unsigned long long) xt.sec * 1000) + t;
+ return ((unsigned long long) xt.sec * 1000) + t + getJSTimeVirtualSkew() + getJSTimeVirtualThreadSkew();
}
-
inline unsigned long long curTimeMicros64() {
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
unsigned long long t = xt.nsec / 1000;
return (((unsigned long long) xt.sec) * 1000000) + t;
- }
-
- // measures up to 1024 seconds. or, 512 seconds with tdiff that is...
+ }
inline unsigned curTimeMicros() {
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
@@ -197,5 +224,30 @@ namespace mongo {
unsigned secs = xt.sec % 1024;
return secs*1000000 + t;
}
+#else
+# include <sys/time.h>
+ inline unsigned long long curTimeMillis64() {
+ timeval tv;
+ gettimeofday(&tv, NULL);
+ return ((unsigned long long)tv.tv_sec) * 1000 + tv.tv_usec / 1000;
+ }
+ inline Date_t jsTime() {
+ timeval tv;
+ gettimeofday(&tv, NULL);
+ unsigned long long t = tv.tv_usec / 1000;
+ return ((unsigned long long) tv.tv_sec * 1000) + t + getJSTimeVirtualSkew() + getJSTimeVirtualThreadSkew();
+ }
+ inline unsigned long long curTimeMicros64() {
+ timeval tv;
+ gettimeofday(&tv, NULL);
+ return (((unsigned long long) tv.tv_sec) * 1000*1000) + tv.tv_usec;
+ }
+ inline unsigned curTimeMicros() {
+ timeval tv;
+ gettimeofday(&tv, NULL);
+ unsigned secs = tv.tv_sec % 1024;
+ return secs*1000*1000 + tv.tv_usec;
+ }
+#endif
} // namespace mongo
diff --git a/util/timer.h b/util/timer.h
index f5a21f8..cbfe859 100644
--- a/util/timer.h
+++ b/util/timer.h
@@ -24,44 +24,81 @@ namespace mongo {
/**
* simple scoped timer
*/
- class Timer {
+ class Timer /*copyable*/ {
public:
- Timer() {
- reset();
- }
-
- Timer( unsigned long long start ) {
- old = start;
- }
-
- int seconds() const {
- return (int)(micros() / 1000000);
- }
+ Timer() { reset(); }
+ Timer( unsigned long long startMicros ) { old = startMicros; }
+ int seconds() const { return (int)(micros() / 1000000); }
+ int millis() const { return (int)(micros() / 1000); }
+ int minutes() const { return seconds() / 60; }
+
- int millis() const {
- return (long)(micros() / 1000);
+ /** gets time interval and resets at the same time. this way we can call curTimeMicros
+ once instead of twice if one wanted millis() and then reset().
+ @return time in millis
+ */
+ int millisReset() {
+ unsigned long long now = curTimeMicros64();
+ int m = (int)((now-old)/1000);
+ old = now;
+ return m;
}
+ // note: dubious that the resolution is as anywhere near as high as ethod name implies!
unsigned long long micros() const {
unsigned long long n = curTimeMicros64();
return n - old;
}
-
unsigned long long micros(unsigned long long & n) const { // returns cur time in addition to timer result
n = curTimeMicros64();
return n - old;
}
- unsigned long long startTime() {
- return old;
- }
-
- void reset() {
- old = curTimeMicros64();
- }
-
+ unsigned long long startTime() const { return old; }
+ void reset() { old = curTimeMicros64(); }
private:
unsigned long long old;
};
+#if 1
+ class DevTimer {
+ public:
+ class scoped {
+ public:
+ scoped(DevTimer& dt) { }
+ ~scoped() { }
+ };
+ DevTimer(string) { }
+ ~DevTimer() { }
+ };
+#elif defined(_WIN32)
+ class DevTimer {
+ const string _name;
+ public:
+ unsigned long long _ticks;
+ class scoped {
+ DevTimer& _dt;
+ unsigned long long _start;
+ public:
+ scoped(DevTimer& dt) : _dt(dt) {
+ LARGE_INTEGER i;
+ QueryPerformanceCounter(&i);
+ _start = i.QuadPart;
+ }
+ ~scoped() {
+ LARGE_INTEGER i;
+ QueryPerformanceCounter(&i);
+ _dt._ticks += (i.QuadPart - _start);
+ }
+ };
+ DevTimer(string name) : _name(name), _ticks(0) {
+ }
+ ~DevTimer() {
+ LARGE_INTEGER freq;
+ assert( QueryPerformanceFrequency(&freq) );
+ cout << "devtimer\t" << _name << '\t' << _ticks*1000.0/freq.QuadPart << "ms" << endl;
+ }
+ };
+#endif
+
} // namespace mongo
diff --git a/util/util.cpp b/util/util.cpp
index 216683a..4528e30 100644
--- a/util/util.cpp
+++ b/util/util.cpp
@@ -21,6 +21,7 @@
#include "file_allocator.h"
#include "optime.h"
#include "time_support.h"
+#include "mongoutils/str.h"
namespace mongo {
@@ -46,6 +47,13 @@ namespace mongo {
static unsigned N = 0;
if ( strcmp( name , "conn" ) == 0 ) {
+ string* x = _threadName.get();
+ if ( x && mongoutils::str::startsWith( *x , "conn" ) ) {
+ int n = atoi( x->c_str() + 4 );
+ if ( n > 0 )
+ return n;
+ warning() << "unexpected thread name [" << *x << "] parsed to " << n << endl;
+ }
unsigned n = ++N;
stringstream ss;
ss << name << n;
@@ -142,17 +150,6 @@ namespace mongo {
struct UtilTest : public UnitTest {
void run() {
- assert( WrappingInt(0) <= WrappingInt(0) );
- assert( WrappingInt(0) <= WrappingInt(1) );
- assert( !(WrappingInt(1) <= WrappingInt(0)) );
- assert( (WrappingInt(0xf0000000) <= WrappingInt(0)) );
- assert( (WrappingInt(0xf0000000) <= WrappingInt(9000)) );
- assert( !(WrappingInt(300) <= WrappingInt(0xe0000000)) );
-
- assert( tdiff(3, 4) == 1 );
- assert( tdiff(4, 3) == -1 );
- assert( tdiff(0xffffffff, 0) == 1 );
-
assert( isPrime(3) );
assert( isPrime(2) );
assert( isPrime(13) );
@@ -184,7 +181,8 @@ namespace mongo {
/* note: can't use malloc herein - may be in signal handler.
logLockless() likely does not comply and should still be fixed todo
- */
+ likewise class string?
+ */
void rawOut( const string &s ) {
if( s.empty() ) return;
diff --git a/util/version.cpp b/util/version.cpp
index 4045cb5..a6efbd5 100644
--- a/util/version.cpp
+++ b/util/version.cpp
@@ -23,11 +23,63 @@
#include <string>
#include "unittest.h"
#include "version.h"
+#include "stringutils.h"
+#include "../db/jsobj.h"
#include "file.h"
+#include "ramlog.h"
+#include "../db/cmdline.h"
namespace mongo {
- const char versionString[] = "1.8.3";
+ /* Approved formats for versionString:
+ * 1.2.3
+ * 1.2.3-pre-
+ * 1.2.3-rc4 (up to rc9)
+ * 1.2.3-rc4-pre-
+ * If you really need to do something else you'll need to fix _versionArray()
+ */
+ const char versionString[] = "2.0.0";
+
+ // See unit test for example outputs
+ static BSONArray _versionArray(const char* version){
+ // this is inefficient, but cached so it doesn't matter
+ BSONArrayBuilder b;
+ string curPart;
+ const char* c = version;
+ int finalPart = 0; // 0 = final release, -100 = pre, -10 to -1 = -10 + X for rcX
+ do { //walks versionString including NUL byte
+ if (!(*c == '.' || *c == '-' || *c == '\0')){
+ curPart += *c;
+ continue;
+ }
+
+ try {
+ unsigned num = stringToNum(curPart.c_str());
+ b.append((int) num);
+ }
+ catch (...){ // not a number
+ if (curPart.empty()){
+ assert(*c == '\0');
+ break;
+ }
+ else if (startsWith(curPart, "rc")){
+ finalPart = -10 + stringToNum(curPart.c_str()+2);
+ break;
+ }
+ else if (curPart == "pre"){
+ finalPart = -100;
+ break;
+ }
+ }
+
+ curPart = "";
+ } while (*c++);
+
+ b.append(finalPart);
+ return b.arr();
+ }
+
+ const BSONArray versionArray = _versionArray(versionString);
string mongodVersion() {
stringstream ss;
@@ -61,38 +113,42 @@ namespace mongo {
#endif
void printSysInfo() {
- log() << "build sys info: " << sysInfo() << endl;
+ log() << "build info: " << sysInfo() << endl;
}
+
+ static Tee * startupWarningsLog = new RamLog("startupWarnings"); //intentionally leaked
+
//
- // 32 bit systems warning
+ // system warnings
//
void show_warnings() {
- // each message adds a leading but not a trailing newline
+ // each message adds a leading and a trailing newline
bool warned = false;
{
const char * foo = strchr( versionString , '.' ) + 1;
int bar = atoi( foo );
if ( ( 2 * ( bar / 2 ) ) != bar ) {
- cout << "\n** NOTE: This is a development version (" << versionString << ") of MongoDB.";
- cout << "\n** Not recommended for production." << endl;
+ log() << startupWarningsLog;
+ log() << "** NOTE: This is a development version (" << versionString << ") of MongoDB." << startupWarningsLog;
+ log() << "** Not recommended for production." << startupWarningsLog;
warned = true;
}
}
if ( sizeof(int*) == 4 ) {
- cout << endl;
- cout << "** NOTE: when using MongoDB 32 bit, you are limited to about 2 gigabytes of data" << endl;
- cout << "** see http://blog.mongodb.org/post/137788967/32-bit-limitations" << endl;
- cout << "** with --dur, the limit is lower" << endl;
+ log() << startupWarningsLog;
+ log() << "** NOTE: when using MongoDB 32 bit, you are limited to about 2 gigabytes of data" << startupWarningsLog;
+ log() << "** see http://blog.mongodb.org/post/137788967/32-bit-limitations" << startupWarningsLog;
+ log() << "** with --journal, the limit is lower" << startupWarningsLog;
warned = true;
}
#ifdef __linux__
if (boost::filesystem::exists("/proc/vz") && !boost::filesystem::exists("/proc/bc")) {
- cout << endl;
- cout << "** WARNING: You are running in OpenVZ. This is known to be broken!!!" << endl;
+ log() << startupWarningsLog;
+ log() << "** WARNING: You are running in OpenVZ. This is known to be broken!!!" << startupWarningsLog;
warned = true;
}
@@ -122,22 +178,49 @@ namespace mongo {
const char* space = strchr(line, ' ');
if ( ! space ) {
- cout << "** WARNING: cannot parse numa_maps" << endl;
+ log() << startupWarningsLog;
+ log() << "** WARNING: cannot parse numa_maps" << startupWarningsLog;
warned = true;
}
else if ( ! startsWith(space+1, "interleave") ) {
- cout << endl;
- cout << "** WARNING: You are running on a NUMA machine." << endl;
- cout << "** We suggest launching mongod like this to avoid performance problems:" << endl;
- cout << "** numactl --interleave=all mongod [other options]" << endl;
+ log() << startupWarningsLog;
+ log() << "** WARNING: You are running on a NUMA machine." << startupWarningsLog;
+ log() << "** We suggest launching mongod like this to avoid performance problems:" << startupWarningsLog;
+ log() << "** numactl --interleave=all mongod [other options]" << startupWarningsLog;
warned = true;
}
}
}
+
+ if (cmdLine.dur){
+ fstream f ("/proc/sys/vm/overcommit_memory", ios_base::in);
+ unsigned val;
+ f >> val;
+
+ if (val == 2) {
+ log() << startupWarningsLog;
+ log() << "** WARNING: /proc/sys/vm/overcommit_memory is " << val << startupWarningsLog;
+ log() << "** Journaling works best with it set to 0 or 1" << startupWarningsLog;
+ }
+ }
+
+ if (boost::filesystem::exists("/proc/sys/vm/zone_reclaim_mode")){
+ fstream f ("/proc/sys/vm/zone_reclaim_mode", ios_base::in);
+ unsigned val;
+ f >> val;
+
+ if (val != 0) {
+ log() << startupWarningsLog;
+ log() << "** WARNING: /proc/sys/vm/zone_reclaim_mode is " << val << startupWarningsLog;
+ log() << "** We suggest setting it to 0" << startupWarningsLog;
+ log() << "** http://www.kernel.org/doc/Documentation/sysctl/vm.txt" << startupWarningsLog;
+ }
+ }
#endif
- if (warned)
- cout << endl;
+ if (warned) {
+ log() << startupWarningsLog;
+ }
}
int versionCmp(StringData rhs, StringData lhs) {
@@ -174,4 +257,28 @@ namespace mongo {
log(1) << "versionCmpTest passed" << endl;
}
} versionCmpTest;
+
+ class VersionArrayTest : public UnitTest {
+ public:
+ void run() {
+ assert( _versionArray("1.2.3") == BSON_ARRAY(1 << 2 << 3 << 0) );
+ assert( _versionArray("1.2.0") == BSON_ARRAY(1 << 2 << 0 << 0) );
+ assert( _versionArray("2.0.0") == BSON_ARRAY(2 << 0 << 0 << 0) );
+
+ assert( _versionArray("1.2.3-pre-") == BSON_ARRAY(1 << 2 << 3 << -100) );
+ assert( _versionArray("1.2.0-pre-") == BSON_ARRAY(1 << 2 << 0 << -100) );
+ assert( _versionArray("2.0.0-pre-") == BSON_ARRAY(2 << 0 << 0 << -100) );
+
+ assert( _versionArray("1.2.3-rc0") == BSON_ARRAY(1 << 2 << 3 << -10) );
+ assert( _versionArray("1.2.0-rc1") == BSON_ARRAY(1 << 2 << 0 << -9) );
+ assert( _versionArray("2.0.0-rc2") == BSON_ARRAY(2 << 0 << 0 << -8) );
+
+ // Note that the pre of an rc is the same as the rc itself
+ assert( _versionArray("1.2.3-rc3-pre-") == BSON_ARRAY(1 << 2 << 3 << -7) );
+ assert( _versionArray("1.2.0-rc4-pre-") == BSON_ARRAY(1 << 2 << 0 << -6) );
+ assert( _versionArray("2.0.0-rc5-pre-") == BSON_ARRAY(2 << 0 << 0 << -5) );
+
+ log(1) << "versionArrayTest passed" << endl;
+ }
+ } versionArrayTest;
}
diff --git a/util/version.h b/util/version.h
index 779fbdc..64f8b14 100644
--- a/util/version.h
+++ b/util/version.h
@@ -4,11 +4,13 @@
#include <string>
namespace mongo {
+ struct BSONArray;
using std::string;
// mongo version
extern const char versionString[];
+ extern const BSONArray versionArray;
string mongodVersion();
int versionCmp(StringData rhs, StringData lhs); // like strcmp