summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
committerAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
commit7645618fd3914cb8a20561625913c20d49504a49 (patch)
tree8370f846f58f6d71165b7a0e2eda04648584ec76 /util
parent68c73c3c7608b4c87f07440dc3232801720b1168 (diff)
downloadmongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz
Imported Upstream version 1.6.0
Diffstat (limited to 'util')
-rw-r--r--util/allocator.h6
-rw-r--r--util/array.h18
-rw-r--r--util/assert_util.cpp142
-rw-r--r--util/assert_util.h146
-rw-r--r--util/atomic_int.h100
-rw-r--r--util/background.cpp71
-rw-r--r--util/background.h71
-rw-r--r--util/base64.cpp2
-rw-r--r--util/builder.h211
-rw-r--r--util/concurrency/list.h81
-rw-r--r--util/concurrency/msg.h61
-rw-r--r--util/concurrency/mutex.h179
-rw-r--r--util/concurrency/mvar.h (renamed from util/mvar.h)0
-rw-r--r--util/concurrency/readme.txt15
-rw-r--r--util/concurrency/rwlock.h220
-rw-r--r--util/concurrency/spin_lock.cpp66
-rw-r--r--util/concurrency/spin_lock.h48
-rw-r--r--util/concurrency/task.cpp171
-rw-r--r--util/concurrency/task.h72
-rw-r--r--util/concurrency/thread_pool.cpp138
-rw-r--r--util/concurrency/thread_pool.h (renamed from util/thread_pool.h)8
-rw-r--r--util/concurrency/value.h85
-rw-r--r--util/concurrency/vars.cpp52
-rw-r--r--util/debug_util.cpp2
-rw-r--r--util/debug_util.h52
-rw-r--r--util/embedded_builder.h3
-rw-r--r--util/file.h15
-rw-r--r--util/file_allocator.h95
-rw-r--r--util/goodies.h262
-rw-r--r--util/hashtab.h59
-rw-r--r--util/hex.h32
-rw-r--r--util/histogram.cpp129
-rw-r--r--util/histogram.h128
-rw-r--r--util/hostandport.h142
-rw-r--r--util/httpclient.cpp40
-rw-r--r--util/httpclient.h18
-rw-r--r--util/log.cpp127
-rw-r--r--util/log.h247
-rw-r--r--util/lruishmap.h2
-rw-r--r--util/md5main.cpp2
-rw-r--r--util/message.cpp627
-rw-r--r--util/message.h320
-rw-r--r--util/message_server.h23
-rw-r--r--util/message_server_asio.cpp18
-rw-r--r--util/message_server_port.cpp76
-rw-r--r--util/miniwebserver.cpp89
-rw-r--r--util/miniwebserver.h24
-rw-r--r--util/mmap.cpp152
-rw-r--r--util/mmap.h156
-rw-r--r--util/mmap_mm.cpp6
-rw-r--r--util/mmap_posix.cpp45
-rw-r--r--util/mmap_win.cpp67
-rwxr-xr-xutil/mongoutils/README7
-rw-r--r--util/mongoutils/checksum.h32
-rw-r--r--util/mongoutils/html.h158
-rwxr-xr-xutil/mongoutils/mongoutils.vcxproj73
-rwxr-xr-xutil/mongoutils/mongoutils.vcxproj.filters10
-rw-r--r--util/mongoutils/str.h118
-rwxr-xr-xutil/mongoutils/test.cpp34
-rw-r--r--util/ntservice.cpp138
-rw-r--r--util/ntservice.h2
-rw-r--r--util/optime.h48
-rw-r--r--util/password.cpp92
-rw-r--r--util/password.h61
-rw-r--r--util/processinfo.cpp47
-rw-r--r--util/processinfo.h5
-rw-r--r--util/processinfo_darwin.cpp4
-rw-r--r--util/processinfo_linux2.cpp7
-rw-r--r--util/processinfo_none.cpp2
-rw-r--r--util/processinfo_win32.cpp2
-rw-r--r--util/queue.h6
-rw-r--r--util/ramlog.h142
-rw-r--r--util/ramstore.cpp93
-rw-r--r--util/ramstore.h86
-rw-r--r--util/sock.cpp290
-rw-r--r--util/sock.h234
-rw-r--r--util/stringutils.cpp44
-rw-r--r--util/stringutils.h43
-rw-r--r--util/text.cpp117
-rw-r--r--util/text.h142
-rw-r--r--util/thread_pool.cpp139
-rw-r--r--util/util.cpp101
-rw-r--r--util/version.cpp86
-rw-r--r--util/version.h24
-rw-r--r--util/winutil.h44
85 files changed, 5672 insertions, 1680 deletions
diff --git a/util/allocator.h b/util/allocator.h
index 90dbf24..2c07973 100644
--- a/util/allocator.h
+++ b/util/allocator.h
@@ -31,7 +31,9 @@ namespace mongo {
return x;
}
-#define malloc mongo::ourmalloc
-#define realloc mongo::ourrealloc
+#define MONGO_malloc mongo::ourmalloc
+#define malloc MONGO_malloc
+#define MONGO_realloc mongo::ourrealloc
+#define realloc MONGO_realloc
} // namespace mongo
diff --git a/util/array.h b/util/array.h
index 827c00e..8da06fe 100644
--- a/util/array.h
+++ b/util/array.h
@@ -1,5 +1,21 @@
// array.h
+/*
+ * Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
namespace mongo {
template<typename T>
@@ -70,7 +86,7 @@ namespace mongo {
return _it->_data[_pos];
}
- operator string() const {
+ string toString() const {
stringstream ss;
ss << _pos;
return ss.str();
diff --git a/util/assert_util.cpp b/util/assert_util.cpp
index b4659cc..faa18cb 100644
--- a/util/assert_util.cpp
+++ b/util/assert_util.cpp
@@ -15,10 +15,20 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "assert_util.h"
#include "assert.h"
#include "file.h"
+#include <cmath>
+using namespace std;
+
+#ifndef _WIN32
+#include <cxxabi.h>
+#include <sys/file.h>
+#endif
+
+//#include "../bson/bson.h"
+#include "../db/jsobj.h"
namespace mongo {
@@ -41,6 +51,17 @@ namespace mongo {
if ( newvalue >= max )
rollover();
}
+
+ void ExceptionInfo::append( BSONObjBuilder& b , const char * m , const char * c ) const {
+ if ( msg.empty() )
+ b.append( m , "unknown assertion" );
+ else
+ b.append( m , msg );
+
+ if ( code )
+ b.append( c , code );
+ }
+
string getDbContext();
@@ -63,8 +84,7 @@ namespace mongo {
lastAssert[0].set(msg, getDbContext().c_str(), file, line);
stringstream temp;
temp << "assertion " << file << ":" << line;
- AssertionException e;
- e.msg = temp.str();
+ AssertionException e(temp.str(),0);
breakpoint();
throw e;
}
@@ -74,13 +94,8 @@ namespace mongo {
raiseError(0,msg);
}
- int uacount = 0;
void uasserted(int msgid, const char *msg) {
assertionCount.condrollover( ++assertionCount.user );
- if ( ++uacount < 100 )
- log() << "User Exception " << msgid << ":" << msg << endl;
- else
- RARELY log() << "User Exception " << msg << endl;
lastAssert[3].set(msg, getDbContext().c_str(), "", 0);
raiseError(msgid,msg);
throw UserException(msgid, msg);
@@ -88,11 +103,19 @@ namespace mongo {
void msgasserted(int msgid, const char *msg) {
assertionCount.condrollover( ++assertionCount.warning );
- log() << "Assertion: " << msgid << ":" << msg << endl;
+ tlog() << "Assertion: " << msgid << ":" << msg << endl;
lastAssert[2].set(msg, getDbContext().c_str(), "", 0);
raiseError(msgid,msg && *msg ? msg : "massert failure");
breakpoint();
- printStackTrace(); // TEMP?? should we get rid of this? TODO
+ printStackTrace();
+ throw MsgAssertionException(msgid, msg);
+ }
+
+ void msgassertedNoTrace(int msgid, const char *msg) {
+ assertionCount.condrollover( ++assertionCount.warning );
+ log() << "Assertion: " << msgid << ":" << msg << endl;
+ lastAssert[2].set(msg, getDbContext().c_str(), "", 0);
+ raiseError(msgid,msg && *msg ? msg : "massert failure");
throw MsgAssertionException(msgid, msg);
}
@@ -100,12 +123,11 @@ namespace mongo {
stringstream ss;
// errno might not work on all systems for streams
// if it doesn't for a system should deal with here
- ss << msg << " stream invalie: " << OUTPUT_ERRNO;
+ ss << msg << " stream invalid: " << errnoWithDescription();
throw UserException( code , ss.str() );
}
-
- mongo::mutex *Assertion::_mutex = new mongo::mutex();
+ mongo::mutex *Assertion::_mutex = new mongo::mutex("Assertion");
string Assertion::toString() {
if( _mutex == 0 )
@@ -125,89 +147,31 @@ namespace mongo {
return ss.str();
}
-
- class LoggingManager {
- public:
- LoggingManager()
- : _enabled(0) , _file(0) {
- }
-
- void start( const string& lp , bool append ){
- uassert( 10268 , "LoggingManager already started" , ! _enabled );
- _append = append;
-
- // test path
- FILE * test = fopen( lp.c_str() , _append ? "a" : "w" );
- if ( ! test ){
- cout << "can't open [" << lp << "] for log file" << endl;
- dbexit( EXIT_BADOPTIONS );
- assert( 0 );
- }
- fclose( test );
-
- _path = lp;
- _enabled = 1;
- rotate();
- }
-
- void rotate(){
- if ( ! _enabled ){
- cout << "LoggingManager not enabled" << endl;
- return;
- }
+ string errnoWithPrefix( const char * prefix ){
+ stringstream ss;
+ if ( prefix )
+ ss << prefix << ": ";
+ ss << errnoWithDescription();
+ return ss.str();
+ }
+
- if ( _file ){
+ string demangleName( const type_info& typeinfo ){
#ifdef _WIN32
- cout << "log rotation doesn't work on windows" << endl;
- return;
+ return typeinfo.name();
#else
- struct tm t;
- localtime_r( &_opened , &t );
-
- stringstream ss;
- ss << _path << "." << ( 1900 + t.tm_year ) << "-" << t.tm_mon << "-" << t.tm_mday
- << "_" << t.tm_hour << "-" << t.tm_min << "-" << t.tm_sec;
- string s = ss.str();
- rename( _path.c_str() , s.c_str() );
-#endif
- }
-
- _file = freopen( _path.c_str() , _append ? "a" : "w" , stdout );
- if ( ! _file ){
- cerr << "can't open: " << _path.c_str() << " for log file" << endl;
- dbexit( EXIT_BADOPTIONS );
- assert(0);
- }
- _opened = time(0);
- }
-
- private:
+ int status;
- bool _enabled;
- string _path;
- bool _append;
+ char * niceName = abi::__cxa_demangle(typeinfo.name(), 0, 0, &status);
+ if ( ! niceName )
+ return typeinfo.name();
- FILE * _file;
- time_t _opened;
-
- } loggingManager;
-
- void initLogging( const string& lp , bool append ){
- cout << "all output going to: " << lp << endl;
- loggingManager.start( lp , append );
- }
-
- void rotateLogs( int signal ){
- loggingManager.rotate();
+ string s = niceName;
+ free(niceName);
+ return s;
+#endif
}
- string errnostring( const char * prefix ){
- stringstream ss;
- if ( prefix )
- ss << prefix << ": ";
- ss << OUTPUT_ERRNO;
- return ss.str();
- }
}
diff --git a/util/assert_util.h b/util/assert_util.h
index bae3a55..018dc43 100644
--- a/util/assert_util.h
+++ b/util/assert_util.h
@@ -22,6 +22,11 @@
namespace mongo {
+ enum CommonErrorCodes {
+ DatabaseDifferCaseCode = 13297 ,
+ StaleConfigInContextCode = 13388
+ };
+
/* these are manipulated outside of mutexes, so be careful */
struct Assertion {
Assertion() {
@@ -81,68 +86,84 @@ namespace mongo {
};
extern AssertionCount assertionCount;
+
+ struct ExceptionInfo {
+ ExceptionInfo() : msg(""),code(-1){}
+ ExceptionInfo( const char * m , int c )
+ : msg( m ) , code( c ){
+ }
+ ExceptionInfo( const string& m , int c )
+ : msg( m ) , code( c ){
+ }
+
+ void append( BSONObjBuilder& b , const char * m = "$err" , const char * c = "code" ) const ;
+
+ string toString() const { stringstream ss; ss << "exception: " << code << " " << msg; return ss.str(); }
+
+ bool empty() const { return msg.empty(); }
+
+
+ string msg;
+ int code;
+ };
class DBException : public std::exception {
public:
- virtual const char* what() const throw() = 0;
+ DBException( const ExceptionInfo& ei ) : _ei(ei){}
+ DBException( const char * msg , int code ) : _ei(msg,code){}
+ DBException( const string& msg , int code ) : _ei(msg,code){}
+ virtual ~DBException() throw() { }
+
+ virtual const char* what() const throw(){ return _ei.msg.c_str(); }
+ virtual int getCode() const { return _ei.code; }
+
+ virtual void appendPrefix( stringstream& ss ) const { }
+
virtual string toString() const {
- return what();
+ stringstream ss; ss << getCode() << " " << what(); return ss.str();
+ return ss.str();
}
- virtual int getCode() = 0;
- operator string() const { return toString(); }
+
+ const ExceptionInfo& getInfo() const { return _ei; }
+
+ protected:
+ ExceptionInfo _ei;
};
class AssertionException : public DBException {
public:
- int code;
- string msg;
- AssertionException() { code = 0; }
+
+ AssertionException( const ExceptionInfo& ei ) : DBException(ei){}
+ AssertionException( const char * msg , int code ) : DBException(msg,code){}
+ AssertionException( const string& msg , int code ) : DBException(msg,code){}
+
virtual ~AssertionException() throw() { }
- virtual bool severe() {
- return true;
- }
- virtual bool isUserAssertion() {
- return false;
- }
- virtual int getCode(){ return code; }
- virtual const char* what() const throw() { return msg.c_str(); }
+
+ virtual bool severe() { return true; }
+ virtual bool isUserAssertion() { return false; }
/* true if an interrupted exception - see KillCurrentOp */
bool interrupted() {
- return code == 11600 || code == 11601;
+ return _ei.code == 11600 || _ei.code == 11601;
}
};
-
+
/* UserExceptions are valid errors that a user can cause, like out of disk space or duplicate key */
class UserException : public AssertionException {
public:
- UserException(int c , const string& m) {
- code = c;
- msg = m;
- }
- virtual bool severe() {
- return false;
- }
- virtual bool isUserAssertion() {
- return true;
- }
- virtual string toString() const {
- return "userassert:" + msg;
- }
- };
+ UserException(int c , const string& m) : AssertionException( m , c ){}
+ virtual bool severe() { return false; }
+ virtual bool isUserAssertion() { return true; }
+ virtual void appendPrefix( stringstream& ss ) const { ss << "userassert:"; }
+ };
+
class MsgAssertionException : public AssertionException {
public:
- MsgAssertionException(int c, const char *m) {
- code = c;
- msg = m;
- }
- virtual bool severe() {
- return false;
- }
- virtual string toString() const {
- return "massert:" + msg;
- }
+ MsgAssertionException( const ExceptionInfo& ei ) : AssertionException( ei ){}
+ MsgAssertionException(int c, const string& m) : AssertionException( m , c ){}
+ virtual bool severe() { return false; }
+ virtual void appendPrefix( stringstream& ss ) const { ss << "massert:"; }
};
void asserted(const char *msg, const char *file, unsigned line);
@@ -150,6 +171,7 @@ namespace mongo {
void uasserted(int msgid, const char *msg);
inline void uasserted(int msgid , string msg) { uasserted(msgid, msg.c_str()); }
void uassert_nothrow(const char *msg); // reported via lasterror, but don't throw exception
+ void msgassertedNoTrace(int msgid, const char *msg);
void msgasserted(int msgid, const char *msg);
inline void msgasserted(int msgid, string msg) { msgasserted(msgid, msg.c_str()); }
@@ -157,59 +179,67 @@ namespace mongo {
#undef assert
#endif
-#define assert(_Expression) (void)( (!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) )
+#define MONGO_assert(_Expression) (void)( (!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) )
+#define assert MONGO_assert
/* "user assert". if asserts, user did something wrong, not our code */
-//#define uassert( 10269 , _Expression) (void)( (!!(_Expression)) || (uasserted(#_Expression, __FILE__, __LINE__), 0) )
-#define uassert(msgid, msg,_Expression) (void)( (!!(_Expression)) || (mongo::uasserted(msgid, msg), 0) )
-
-#define xassert(_Expression) (void)( (!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) )
-
-#define yassert 1
+#define MONGO_uassert(msgid, msg, expr) (void)( (!!(expr)) || (mongo::uasserted(msgid, msg), 0) )
+#define uassert MONGO_uassert
/* warning only - keeps going */
-#define wassert(_Expression) (void)( (!!(_Expression)) || (mongo::wasserted(#_Expression, __FILE__, __LINE__), 0) )
+#define MONGO_wassert(_Expression) (void)( (!!(_Expression)) || (mongo::wasserted(#_Expression, __FILE__, __LINE__), 0) )
+#define wassert MONGO_wassert
/* display a message, no context, and throw assertionexception
easy way to throw an exception and log something without our stack trace
display happening.
*/
-#define massert(msgid, msg,_Expression) (void)( (!!(_Expression)) || (mongo::msgasserted(msgid, msg), 0) )
+#define MONGO_massert(msgid, msg, expr) (void)( (!!(expr)) || (mongo::msgasserted(msgid, msg), 0) )
+#define massert MONGO_massert
/* dassert is 'debug assert' -- might want to turn off for production as these
could be slow.
*/
#if defined(_DEBUG)
-#define dassert assert
+# define MONGO_dassert assert
#else
-#define dassert(x)
+# define MONGO_dassert(x)
#endif
+#define dassert MONGO_dassert
// some special ids that we want to duplicate
// > 10000 asserts
// < 10000 UserException
-#define ASSERT_ID_DUPKEY 11000
+ enum { ASSERT_ID_DUPKEY = 11000 };
+ /* throws a uassertion with an appropriate msg */
void streamNotGood( int code , string msg , std::ios& myios );
-#define ASSERT_STREAM_GOOD(msgid,msg,stream) (void)( (!!((stream).good())) || (mongo::streamNotGood(msgid, msg, stream), 0) )
+ inline void assertStreamGood(unsigned msgid, string msg, std::ios& myios) {
+ if( !myios.good() ) streamNotGood(msgid, msg, myios);
+ }
+
+ string demangleName( const type_info& typeinfo );
} // namespace mongo
-#define BOOST_CHECK_EXCEPTION( expression ) \
+#define BOOST_CHECK_EXCEPTION MONGO_BOOST_CHECK_EXCEPTION
+#define MONGO_BOOST_CHECK_EXCEPTION( expression ) \
try { \
expression; \
} catch ( const std::exception &e ) { \
- problem() << "caught boost exception: " << e.what() << endl; \
- assert( false ); \
+ stringstream ss; \
+ ss << "caught boost exception: " << e.what(); \
+ msgasserted( 13294 , ss.str() ); \
} catch ( ... ) { \
massert( 10437 , "unknown boost failed" , false ); \
}
-#define DESTRUCTOR_GUARD( expression ) \
+#define DESTRUCTOR_GUARD MONGO_DESTRUCTOR_GUARD
+#define MONGO_DESTRUCTOR_GUARD( expression ) \
try { \
expression; \
} catch ( const std::exception &e ) { \
diff --git a/util/atomic_int.h b/util/atomic_int.h
deleted file mode 100644
index de50560..0000000
--- a/util/atomic_int.h
+++ /dev/null
@@ -1,100 +0,0 @@
-// atomic_int.h
-// atomic wrapper for unsigned
-
-/* Copyright 2009 10gen Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#if defined(_WIN32)
-# include <windows.h>
-#endif
-
-namespace mongo{
-
-
- struct AtomicUInt{
- AtomicUInt() : x(0) {}
- AtomicUInt(unsigned z) : x(z) { }
- volatile unsigned x;
- operator unsigned() const {
- return x;
- }
- inline AtomicUInt operator++(); // ++prefix
- inline AtomicUInt operator++(int);// postfix++
- inline AtomicUInt operator--(); // --prefix
- inline AtomicUInt operator--(int); // postfix--
- };
-
-#if defined(_WIN32)
- AtomicUInt AtomicUInt::operator++(){
- // InterlockedIncrement returns the new value
- return InterlockedIncrement((volatile long*)&x); //long is 32bits in Win64
- }
- AtomicUInt AtomicUInt::operator++(int){
- return InterlockedIncrement((volatile long*)&x)-1;
- }
- AtomicUInt AtomicUInt::operator--(){
- return InterlockedDecrement((volatile long*)&x);
- }
- AtomicUInt AtomicUInt::operator--(int){
- return InterlockedDecrement((volatile long*)&x)+1;
- }
-#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
- // this is in GCC >= 4.1
- AtomicUInt AtomicUInt::operator++(){
- return __sync_add_and_fetch(&x, 1);
- }
- AtomicUInt AtomicUInt::operator++(int){
- return __sync_fetch_and_add(&x, 1);
- }
- AtomicUInt AtomicUInt::operator--(){
- return __sync_add_and_fetch(&x, -1);
- }
- AtomicUInt AtomicUInt::operator--(int){
- return __sync_fetch_and_add(&x, -1);
- }
-#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))
- // from boost 1.39 interprocess/detail/atomic.hpp
-
- inline unsigned atomic_int_helper(volatile unsigned *x, int val){
- int r;
- asm volatile
- (
- "lock\n\t"
- "xadd %1, %0":
- "+m"( *x ), "=r"( r ): // outputs (%0, %1)
- "1"( val ): // inputs (%2 == %1)
- "memory", "cc" // clobbers
- );
- return r;
- }
- AtomicUInt AtomicUInt::operator++(){
- return atomic_int_helper(&x, 1)+1;
- }
- AtomicUInt AtomicUInt::operator++(int){
- return atomic_int_helper(&x, 1);
- }
- AtomicUInt AtomicUInt::operator--(){
- return atomic_int_helper(&x, -1)-1;
- }
- AtomicUInt AtomicUInt::operator--(int){
- return atomic_int_helper(&x, -1);
- }
-#else
-# error "unsupported compiler or platform"
-#endif
-
-} // namespace mongo
diff --git a/util/background.cpp b/util/background.cpp
index 4125315..a6d8290 100644
--- a/util/background.cpp
+++ b/util/background.cpp
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "goodies.h"
#include "background.h"
+#include <list>
namespace mongo {
BackgroundJob *BackgroundJob::grab = 0;
- mongo::mutex BackgroundJob::mutex;
+ mongo::mutex BackgroundJob::mutex("BackgroundJob");
/* static */
void BackgroundJob::thr() {
@@ -31,9 +32,25 @@ namespace mongo {
assert( us->state == NotStarted );
us->state = Running;
grab = 0;
- us->run();
+
+ {
+ string nm = us->name();
+ setThreadName(nm.c_str());
+ }
+
+ try {
+ us->run();
+ }
+ catch ( std::exception& e ){
+ log( LL_ERROR ) << "backgroundjob error: " << e.what() << endl;
+ }
+ catch(...) {
+ log( LL_ERROR ) << "uncaught exception in BackgroundJob" << endl;
+ }
us->state = Done;
- if ( us->deleteSelf )
+ bool delSelf = us->deleteSelf;
+ us->ending();
+ if( delSelf )
delete us;
}
@@ -47,18 +64,56 @@ namespace mongo {
return *this;
}
- bool BackgroundJob::wait(int msMax) {
+ bool BackgroundJob::wait(int msMax, unsigned maxsleep) {
assert( state != NotStarted );
- int ms = 1;
+ unsigned ms = 0;
Date_t start = jsTime();
while ( state != Done ) {
sleepmillis(ms);
- if ( ms < 1000 )
- ms = ms * 2;
+ if( ms*2<maxsleep ) ms*=2;
if ( msMax && ( int( jsTime() - start ) > msMax) )
return false;
}
return true;
}
+ void BackgroundJob::go(list<BackgroundJob*>& L) {
+ for( list<BackgroundJob*>::iterator i = L.begin(); i != L.end(); i++ )
+ (*i)->go();
+ }
+
+ /* wait for several jobs to finish. */
+ void BackgroundJob::wait(list<BackgroundJob*>& L, unsigned maxsleep) {
+ unsigned ms = 0;
+ {
+ x:
+ sleepmillis(ms);
+ if( ms*2<maxsleep ) ms*=2;
+ for( list<BackgroundJob*>::iterator i = L.begin(); i != L.end(); i++ ) {
+ assert( (*i)->state != NotStarted );
+ if( (*i)->state != Done )
+ goto x;
+ }
+ }
+ }
+
+ void PeriodicBackgroundJob::run(){
+ // want to handle first one differently so inShutdown is obeyed nicely
+ sleepmillis( _millis );
+
+ while ( ! inShutdown() ){
+ try {
+ runLoop();
+ }
+ catch ( std::exception& e ){
+ log( LL_ERROR ) << "PeriodicBackgroundJob [" << name() << "] error: " << e.what() << endl;
+ }
+ catch ( ... ){
+ log( LL_ERROR ) << "PeriodicBackgroundJob [" << name() << "] unknown error" << endl;
+ }
+
+ sleepmillis( _millis );
+ }
+ }
+
} // namespace mongo
diff --git a/util/background.h b/util/background.h
index c95a5bd..ee59455 100644
--- a/util/background.h
+++ b/util/background.h
@@ -19,31 +19,47 @@
namespace mongo {
- /* object-orienty background thread dispatching.
+ /** object-orienty background thread dispatching.
subclass and define run()
- It is ok to call go() more than once -- if the previous invocation
- has finished. Thus one pattern of use is to embed a backgroundjob
- in your object and reuse it (or same thing with inheritance).
+ It is ok to call go(), that is, run the job, more than once -- if the
+ previous invocation has finished. Thus one pattern of use is to embed
+ a backgroundjob in your object and reuse it (or same thing with
+ inheritance). Each go() call spawns a new thread.
+
+ note when job destructs, the thread is not terminated if still running.
+ generally if the thread could still be running, allocate the job dynamically
+ and set deleteSelf to true.
+ */
+ /* example
+ class ConnectBG : public BackgroundJob {
+ public:
+ int sock;
+ int res;
+ SockAddr farEnd;
+ void run() {
+ res = ::connect(sock, farEnd.raw(), farEnd.addressSize);
+ }
+ };
*/
- class BackgroundJob {
+ class BackgroundJob : boost::noncopyable {
protected:
- /* define this to do your work! */
+ /** define this to do your work.
+ after this returns, state is set to done.
+ after this returns, deleted if deleteSelf true.
+ */
virtual void run() = 0;
-
+ virtual string name() = 0;
+ virtual void ending() { } // hook for post processing if desired after everything else done. not called when deleteSelf=true
public:
enum State {
NotStarted,
Running,
Done
};
- State getState() const {
- return state;
- }
- bool running() const {
- return state == Running;
- }
+ State getState() const { return state; }
+ bool running() const { return state == Running; }
bool deleteSelf; // delete self when Done?
@@ -53,14 +69,20 @@ namespace mongo {
}
virtual ~BackgroundJob() { }
- // start job. returns before it's finished.
+ // starts job. returns once it is "dispatched"
BackgroundJob& go();
// wait for completion. this spins with sleep() so not terribly efficient.
// returns true if did not time out.
//
// note you can call wait() more than once if the first call times out.
- bool wait(int msMax = 0);
+ bool wait(int msMax = 0, unsigned maxSleepInterval=1000);
+
+ /* start several */
+ static void go(list<BackgroundJob*>&);
+
+ /* wait for several jobs to finish. */
+ static void wait(list<BackgroundJob*>&, unsigned maxSleepInterval=1000);
private:
static BackgroundJob *grab;
@@ -69,4 +91,23 @@ namespace mongo {
volatile State state;
};
+ class PeriodicBackgroundJob : public BackgroundJob {
+ public:
+ PeriodicBackgroundJob( int millisToSleep )
+ : _millis( millisToSleep ){
+ }
+
+ virtual ~PeriodicBackgroundJob(){}
+
+ /** this gets called every millisToSleep ms */
+ virtual void runLoop() = 0;
+
+ virtual void run();
+
+
+ private:
+ int _millis;
+
+ };
+
} // namespace mongo
diff --git a/util/base64.cpp b/util/base64.cpp
index 8d9d544..35a3aba 100644
--- a/util/base64.cpp
+++ b/util/base64.cpp
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "base64.h"
namespace mongo {
diff --git a/util/builder.h b/util/builder.h
deleted file mode 100644
index f9d3514..0000000
--- a/util/builder.h
+++ /dev/null
@@ -1,211 +0,0 @@
-/* builder.h */
-
-/* Copyright 2009 10gen Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include "../stdafx.h"
-#include <string.h>
-
-namespace mongo {
-
- class StringBuilder;
-
- class BufBuilder {
- public:
- BufBuilder(int initsize = 512) : size(initsize) {
- if ( size > 0 ) {
- data = (char *) malloc(size);
- assert(data);
- } else {
- data = 0;
- }
- l = 0;
- }
- ~BufBuilder() {
- kill();
- }
-
- void kill() {
- if ( data ) {
- free(data);
- data = 0;
- }
- }
-
- void reset( int maxSize = 0 ){
- l = 0;
- if ( maxSize && size > maxSize ){
- free(data);
- data = (char*)malloc(maxSize);
- size = maxSize;
- }
-
- }
-
- /* leave room for some stuff later */
- void skip(int n) {
- grow(n);
- }
-
- /* note this may be deallocated (realloced) if you keep writing. */
- char* buf() {
- return data;
- }
-
- /* assume ownership of the buffer - you must then free it */
- void decouple() {
- data = 0;
- }
-
- template<class T> void append(T j) {
- *((T*)grow(sizeof(T))) = j;
- }
- void append(short j) {
- append<short>(j);
- }
- void append(int j) {
- append<int>(j);
- }
- void append(unsigned j) {
- append<unsigned>(j);
- }
- void append(bool j) {
- append<bool>(j);
- }
- void append(double j) {
- append<double>(j);
- }
-
- void append(const void *src, size_t len) {
- memcpy(grow(len), src, len);
- }
-
- void append(const char *str) {
- append((void*) str, strlen(str)+1);
- }
-
- void append(const string &str) {
- append( (void *)str.c_str(), str.length() + 1 );
- }
-
- void append( int val , int padding ){
-
- }
-
- int len() const {
- return l;
- }
-
- void setlen( int newLen ){
- l = newLen;
- }
-
- private:
- /* returns the pre-grow write position */
- char* grow(int by) {
- int oldlen = l;
- l += by;
- if ( l > size ) {
- int a = size * 2;
- if ( a == 0 )
- a = 512;
- if ( l > a )
- a = l + 16 * 1024;
- assert( a < 64 * 1024 * 1024 );
- data = (char *) realloc(data, a);
- size= a;
- }
- return data + oldlen;
- }
-
- char *data;
- int l;
- int size;
-
- friend class StringBuilder;
- };
-
- class StringBuilder {
- public:
- StringBuilder( int initsize=256 )
- : _buf( initsize ){
- }
-
-#define SBNUM(val,maxSize,macro) \
- int prev = _buf.l; \
- int z = sprintf( _buf.grow(maxSize) , macro , (val) ); \
- _buf.l = prev + z; \
- return *this;
-
-
- StringBuilder& operator<<( double x ){
- SBNUM( x , 25 , "%g" );
- }
- StringBuilder& operator<<( int x ){
- SBNUM( x , 11 , "%d" );
- }
- StringBuilder& operator<<( unsigned x ){
- SBNUM( x , 11 , "%u" );
- }
- StringBuilder& operator<<( long x ){
- SBNUM( x , 22 , "%ld" );
- }
- StringBuilder& operator<<( unsigned long x ){
- SBNUM( x , 22 , "%lu" );
- }
- StringBuilder& operator<<( long long x ){
- SBNUM( x , 22 , "%lld" );
- }
- StringBuilder& operator<<( unsigned long long x ){
- SBNUM( x , 22 , "%llu" );
- }
- StringBuilder& operator<<( short x ){
- SBNUM( x , 8 , "%hd" );
- }
- StringBuilder& operator<<( char c ){
- _buf.grow( 1 )[0] = c;
- return *this;
- }
-
- void append( const char * str ){
- int x = strlen( str );
- memcpy( _buf.grow( x ) , str , x );
- }
- StringBuilder& operator<<( const char * str ){
- append( str );
- return *this;
- }
- StringBuilder& operator<<( const string& s ){
- append( s.c_str() );
- return *this;
- }
-
- // access
-
- void reset( int maxSize = 0 ){
- _buf.reset( maxSize );
- }
-
- string str(){
- return string(_buf.data, _buf.l);
- }
-
- private:
- BufBuilder _buf;
- };
-
-} // namespace mongo
diff --git a/util/concurrency/list.h b/util/concurrency/list.h
new file mode 100644
index 0000000..968ff4d
--- /dev/null
+++ b/util/concurrency/list.h
@@ -0,0 +1,81 @@
+// list.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+namespace mongo {
+
+/* this class uses a mutex for writes, but not for reads.
+ we can get fancier later...
+
+ struct Member : public List1<Member>::Base {
+ const char *host;
+ int port;
+ };
+ List1<Member> _members;
+ _members.head()->next();
+
+*/
+template<typename T>
+class List1 : boost::noncopyable {
+public:
+ /* next() and head() return 0 at end of list */
+
+ List1() : _head(0), _m("List1"), _orphans(0) { }
+
+ class Base {
+ friend class List1;
+ T *_next;
+ public:
+ T* next() const { return _next; }
+ };
+
+ T* head() const { return _head; }
+
+ void push(T* t) {
+ scoped_lock lk(_m);
+ t->_next = _head;
+ _head = t;
+ }
+
+ // intentionally leak.
+ void orphanAll() {
+ _head = 0;
+ }
+
+ /* t is not deleted, but is removed from the list. (orphaned) */
+ void orphan(T* t) {
+ scoped_lock lk(_m);
+ T *&prev = _head;
+ T *n = prev;
+ while( n != t ) {
+ prev = n->_next;
+ n = prev;
+ }
+ prev = t->_next;
+ if( ++_orphans > 500 )
+ log() << "warning orphans=" << _orphans << '\n';
+ }
+
+private:
+ T *_head;
+ mutex _m;
+ int _orphans;
+};
+
+};
diff --git a/util/concurrency/msg.h b/util/concurrency/msg.h
new file mode 100644
index 0000000..a5b07d3
--- /dev/null
+++ b/util/concurrency/msg.h
@@ -0,0 +1,61 @@
+// @file msg.h - interthread message passing
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include <deque>
+#include "task.h"
+
+namespace mongo {
+
+ namespace task {
+
+ typedef boost::function<void()> lam;
+
+ /** typical usage is: task::fork( new Server("threadname") ); */
+ class Server : public Task {
+ public:
+ /** send a message to the port */
+ void send(lam);
+
+ Server(string name) : _name(name), rq(false) { }
+ virtual ~Server() { }
+
+ /** send message but block until function completes */
+ void call(const lam&);
+
+ void requeue() { rq = true; }
+
+ protected:
+ /* REMINDER : for use in mongod, you will want to have this call Client::initThread(). */
+ virtual void starting() { }
+
+ private:
+ virtual bool initClient() { return true; }
+ virtual string name() { return _name; }
+ void doWork();
+ deque<lam> d;
+ boost::mutex m;
+ boost::condition c;
+ string _name;
+ bool rq;
+ };
+
+ }
+
+}
diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h
new file mode 100644
index 0000000..9ab3960
--- /dev/null
+++ b/util/concurrency/mutex.h
@@ -0,0 +1,179 @@
+// @file mutex.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <set>
+
+namespace mongo {
+
+ extern bool __destroyingStatics;
+ class mutex;
+
+ // only used on _DEBUG builds:
+ class MutexDebugger {
+ typedef const char * mid; // mid = mutex ID
+ typedef map<mid,int> Preceeding;
+ map< mid, int > maxNest;
+ boost::thread_specific_ptr< Preceeding > us;
+ map< mid, set<mid> > followers;
+ boost::mutex &x;
+ unsigned magic;
+ public:
+ // set these to create an assert that
+ // b must never be locked before a
+ // so
+ // a.lock(); b.lock(); is fine
+ // b.lock(); alone is fine too
+ // only checked on _DEBUG builds.
+ string a,b;
+
+ void aBreakPoint(){}
+ void programEnding();
+ MutexDebugger();
+ void entering(mid m) {
+ if( magic != 0x12345678 ) return;
+
+ Preceeding *_preceeding = us.get();
+ if( _preceeding == 0 )
+ us.reset( _preceeding = new Preceeding() );
+ Preceeding &preceeding = *_preceeding;
+
+ if( a == m ) {
+ aBreakPoint();
+ if( preceeding[b.c_str()] ) {
+ cout << "mutex problem " << b << " was locked before " << a << endl;
+ assert(false);
+ }
+ }
+
+ preceeding[m]++;
+ if( preceeding[m] > 1 ) {
+ // recursive re-locking.
+ if( preceeding[m] > maxNest[m] )
+ maxNest[m] = preceeding[m];
+ return;
+ }
+
+ bool failed = false;
+ string err;
+ {
+ boost::mutex::scoped_lock lk(x);
+ followers[m];
+ for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
+ if( m != i->first && i->second > 0 ) {
+ followers[i->first].insert(m);
+ if( followers[m].count(i->first) != 0 ){
+ failed = true;
+ stringstream ss;
+ mid bad = i->first;
+ ss << "mutex problem" <<
+ "\n when locking " << m <<
+ "\n " << bad << " was already locked and should not be."
+ "\n set a and b above to debug.\n";
+ stringstream q;
+ for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
+ if( i->first != m && i->first != bad && i->second > 0 )
+ q << " " << i->first << '\n';
+ }
+ string also = q.str();
+ if( !also.empty() )
+ ss << "also locked before " << m << " in this thread (no particular order):\n" << also;
+ err = ss.str();
+ break;
+ }
+ }
+ }
+ }
+ if( failed ) {
+ cout << err << endl;
+ assert( 0 );
+ }
+ }
+ void leaving(mid m) {
+ if( magic != 0x12345678 ) return;
+ Preceeding& preceeding = *us.get();
+ preceeding[m]--;
+ if( preceeding[m] < 0 ) {
+ cout << "ERROR: lock count for " << m << " is " << preceeding[m] << endl;
+ assert( preceeding[m] >= 0 );
+ }
+ }
+ };
+ extern MutexDebugger mutexDebugger;
+
+ // If you create a local static instance of this class, that instance will be destroyed
+ // before all global static objects are destroyed, so __destroyingStatics will be set
+ // to true before the global static variables are destroyed.
+ class StaticObserver : boost::noncopyable {
+ public:
+ ~StaticObserver() { __destroyingStatics = true; }
+ };
+
+ // On pthread systems, it is an error to destroy a mutex while held. Static global
+ // mutexes may be held upon shutdown in our implementation, and this way we avoid
+ // destroying them.
+ class mutex : boost::noncopyable {
+ public:
+#if defined(_DEBUG)
+ const char *_name;
+#endif
+
+#if defined(_DEBUG)
+ mutex(const char *name)
+ : _name(name)
+#else
+ mutex(const char *)
+#endif
+ {
+ _m = new boost::mutex();
+ }
+ ~mutex() {
+ if( !__destroyingStatics ) {
+ delete _m;
+ }
+ }
+ class scoped_lock : boost::noncopyable {
+#if defined(_DEBUG)
+ mongo::mutex *mut;
+#endif
+ public:
+ scoped_lock( mongo::mutex &m ) : _l( m.boost() ) {
+#if defined(_DEBUG)
+ mut = &m;
+ mutexDebugger.entering(mut->_name);
+#endif
+ }
+ ~scoped_lock() {
+#if defined(_DEBUG)
+ mutexDebugger.leaving(mut->_name);
+#endif
+ }
+ boost::mutex::scoped_lock &boost() { return _l; }
+ private:
+ boost::mutex::scoped_lock _l;
+ };
+ private:
+ boost::mutex &boost() { return *_m; }
+ boost::mutex *_m;
+ };
+
+ typedef mutex::scoped_lock scoped_lock;
+ typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock;
+
+}
diff --git a/util/mvar.h b/util/concurrency/mvar.h
index 7d17051..7d17051 100644
--- a/util/mvar.h
+++ b/util/concurrency/mvar.h
diff --git a/util/concurrency/readme.txt b/util/concurrency/readme.txt
new file mode 100644
index 0000000..6f308f5
--- /dev/null
+++ b/util/concurrency/readme.txt
@@ -0,0 +1,15 @@
+util/concurrency/ files
+
+list.h - a list class that is lock-free for reads
+rwlock.h - read/write locks (RWLock)
+msg.h - message passing between threads
+task.h - an abstraction around threads
+mutex.h - small enhancements that wrap boost::mutex
+thread_pool.h
+mvar.h
+ This is based on haskell's MVar synchronization primitive:
+ http://www.haskell.org/ghc/docs/latest/html/libraries/base-4.2.0.0/Control-Concurrent-MVar.html
+ It is a thread-safe queue that can hold at most one object.
+ You can also think of it as a box that can be either full or empty.
+value.h
+ Atomic wrapper for values/objects that are copy constructable / assignable
diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h
new file mode 100644
index 0000000..75169b2
--- /dev/null
+++ b/util/concurrency/rwlock.h
@@ -0,0 +1,220 @@
+// rwlock.h
+
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "mutex.h"
+
+#if BOOST_VERSION >= 103500
+ #define BOOST_RWLOCK
+#else
+
+ #if defined(_WIN32)
+ #error need boost >= 1.35 for windows
+ #endif
+
+ #include <pthread.h>
+
+#endif
+
+#ifdef BOOST_RWLOCK
+#include <boost/thread/shared_mutex.hpp>
+#undef assert
+#define assert MONGO_assert
+#endif
+
+namespace mongo {
+
+#ifdef BOOST_RWLOCK
+ class RWLock {
+ boost::shared_mutex _m;
+ public:
+#if defined(_DEBUG)
+ const char *_name;
+ RWLock(const char *name) : _name(name) { }
+#else
+ RWLock(const char *) { }
+#endif
+ void lock(){
+ _m.lock();
+#if defined(_DEBUG)
+ mutexDebugger.entering(_name);
+#endif
+ }
+ void unlock(){
+#if defined(_DEBUG)
+ mutexDebugger.leaving(_name);
+#endif
+ _m.unlock();
+ }
+
+ void lock_shared(){
+ _m.lock_shared();
+ }
+
+ void unlock_shared(){
+ _m.unlock_shared();
+ }
+
+ bool lock_shared_try( int millis ){
+ boost::system_time until = get_system_time();
+ until += boost::posix_time::milliseconds(millis);
+ if( _m.timed_lock_shared( until ) ) {
+ return true;
+ }
+ return false;
+ }
+
+ bool lock_try( int millis = 0 ){
+ boost::system_time until = get_system_time();
+ until += boost::posix_time::milliseconds(millis);
+ if( _m.timed_lock( until ) ) {
+#if defined(_DEBUG)
+ mutexDebugger.entering(_name);
+#endif
+ return true;
+ }
+ return false;
+ }
+
+
+ };
+#else
+ class RWLock {
+ pthread_rwlock_t _lock;
+
+ inline void check( int x ){
+ if( x == 0 )
+ return;
+ log() << "pthread rwlock failed: " << x << endl;
+ assert( x == 0 );
+ }
+
+ public:
+#if defined(_DEBUG)
+ const char *_name;
+ RWLock(const char *name) : _name(name) {
+#else
+ RWLock(const char *) {
+#endif
+ check( pthread_rwlock_init( &_lock , 0 ) );
+ }
+
+ ~RWLock(){
+ if ( ! __destroyingStatics ){
+ check( pthread_rwlock_destroy( &_lock ) );
+ }
+ }
+
+ void lock(){
+ check( pthread_rwlock_wrlock( &_lock ) );
+#if defined(_DEBUG)
+ mutexDebugger.entering(_name);
+#endif
+ }
+ void unlock(){
+#if defined(_DEBUG)
+ mutexDebugger.leaving(_name);
+#endif
+ check( pthread_rwlock_unlock( &_lock ) );
+ }
+
+ void lock_shared(){
+ check( pthread_rwlock_rdlock( &_lock ) );
+ }
+
+ void unlock_shared(){
+ check( pthread_rwlock_unlock( &_lock ) );
+ }
+
+ bool lock_shared_try( int millis ){
+ return _try( millis , false );
+ }
+
+ bool lock_try( int millis = 0 ){
+ if( _try( millis , true ) ) {
+#if defined(_DEBUG)
+ mutexDebugger.entering(_name);
+#endif
+ return true;
+ }
+ return false;
+ }
+
+ bool _try( int millis , bool write ){
+ while ( true ) {
+ int x = write ?
+ pthread_rwlock_trywrlock( &_lock ) :
+ pthread_rwlock_tryrdlock( &_lock );
+
+ if ( x <= 0 ) {
+ return true;
+ }
+
+ if ( millis-- <= 0 )
+ return false;
+
+ if ( x == EBUSY ){
+ sleepmillis(1);
+ continue;
+ }
+ check(x);
+ }
+
+ return false;
+ }
+
+ };
+
+
+#endif
+
+ class rwlock_try_write {
+ RWLock& _l;
+ public:
+ struct exception { };
+ rwlock_try_write(RWLock& l, int millis = 0) : _l(l) {
+ if( !l.lock_try(millis) ) throw exception();
+ }
+ ~rwlock_try_write() { _l.unlock(); }
+ };
+
+ /* scoped lock */
+ struct rwlock {
+ rwlock( const RWLock& lock , bool write , bool alreadyHaveLock = false )
+ : _lock( (RWLock&)lock ) , _write( write ){
+
+ if ( ! alreadyHaveLock ){
+ if ( _write )
+ _lock.lock();
+ else
+ _lock.lock_shared();
+ }
+ }
+
+ ~rwlock(){
+ if ( _write )
+ _lock.unlock();
+ else
+ _lock.unlock_shared();
+ }
+
+ RWLock& _lock;
+ bool _write;
+ };
+}
diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp
new file mode 100644
index 0000000..b3e689a
--- /dev/null
+++ b/util/concurrency/spin_lock.cpp
@@ -0,0 +1,66 @@
+// spin_lock.cpp
+
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <time.h>
+#include "spin_lock.h"
+
+namespace mongo {
+
+ SpinLock::SpinLock() : _locked( false ){}
+
+ SpinLock::~SpinLock(){}
+
+ void SpinLock::lock(){
+#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+
+ // fast path
+ if (!_locked && !__sync_lock_test_and_set(&_locked, true)) {
+ return;
+ }
+
+ // wait for lock
+ int wait = 1000;
+ while ((wait-- > 0) && (_locked)) {}
+
+ // if failed to grab lock, sleep
+ struct timespec t;
+ t.tv_sec = 0;
+ t.tv_nsec = 5000000;
+ while (__sync_lock_test_and_set(&_locked, true)) {
+ nanosleep(&t, NULL);
+ }
+#else
+
+ // WARNING "TODO Missing spin lock in this platform."
+
+#endif
+ }
+
+ void SpinLock::unlock(){
+#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+
+ __sync_lock_release(&_locked);
+
+#else
+
+ // WARNING "TODO Missing spin lock in this platform."
+
+#endif
+ }
+
+} // namespace mongo
diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h
new file mode 100644
index 0000000..110290d
--- /dev/null
+++ b/util/concurrency/spin_lock.h
@@ -0,0 +1,48 @@
+// spin_lock.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef CONCURRENCY_SPINLOCK_HEADER
+#define CONCURRENCY_SPINLOCK_HEADER
+
+namespace mongo {
+
+ /**
+ * BIG WARNING - COMPILES, BUT NOT READY FOR USE - BIG WARNING
+ *
+ * The spinlock currently requires late GCC support
+ * routines. Support for other platforms will be added soon.
+ */
+ class SpinLock{
+ public:
+ SpinLock();
+ ~SpinLock();
+
+ void lock();
+ void unlock();
+
+ private:
+ bool _locked;
+
+ // Non-copyable, non-assignable
+ SpinLock(SpinLock&);
+ SpinLock& operator=(SpinLock&);
+ };
+
+} // namespace mongo
+
+#endif // CONCURRENCY_SPINLOCK_HEADER
diff --git a/util/concurrency/task.cpp b/util/concurrency/task.cpp
new file mode 100644
index 0000000..05e43e5
--- /dev/null
+++ b/util/concurrency/task.cpp
@@ -0,0 +1,171 @@
+// @file task.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "task.h"
+#include "../goodies.h"
+#include "../unittest.h"
+#include "boost/thread/condition.hpp"
+
+namespace mongo {
+
+ namespace task {
+
+ /*void foo() {
+ boost::mutex m;
+ boost::mutex::scoped_lock lk(m);
+ boost::condition cond;
+ cond.wait(lk);
+ cond.notify_one();
+ }*/
+
+ Task::Task() {
+ n = 0;
+ repeat = 0;
+ deleteSelf = true;
+ }
+
+ void Task::halt() { repeat = 0; }
+
+ void Task::run() {
+ assert( n == 0 );
+ while( 1 ) {
+ n++;
+ try {
+ doWork();
+ }
+ catch(...) { }
+ if( repeat == 0 )
+ break;
+ sleepmillis(repeat);
+ if( inShutdown() )
+ break;
+ }
+ }
+
+ void Task::begin() {
+ go();
+ }
+
+ void fork(Task *t) {
+ t->begin();
+ }
+
+ void repeat(Task *t, unsigned millis) {
+ t->repeat = millis;
+ t->begin();
+ }
+
+ }
+}
+
+#include "msg.h"
+
+/* task::Server */
+
+namespace mongo {
+ namespace task {
+
+ /* to get back a return value */
+ struct Ret {
+ Ret() : done(false) { }
+ bool done;
+ boost::mutex m;
+ boost::condition c;
+ const lam *msg;
+ void f() {
+ (*msg)();
+ done = true;
+ c.notify_one();
+ }
+ };
+
+ void Server::call( const lam& msg ) {
+ Ret r;
+ r.msg = &msg;
+ lam f = boost::bind(&Ret::f, &r);
+ send(f);
+ {
+ boost::mutex::scoped_lock lk(r.m);
+ while( !r.done )
+ r.c.wait(lk);
+ }
+ }
+
+ void Server::send( lam msg ) {
+ {
+ boost::mutex::scoped_lock lk(m);
+ d.push_back(msg);
+ }
+ c.notify_one();
+ }
+
+ void Server::doWork() {
+ starting();
+ while( 1 ) {
+ lam f;
+ try {
+ boost::mutex::scoped_lock lk(m);
+ while( d.empty() )
+ c.wait(lk);
+ f = d.front();
+ d.pop_front();
+ }
+ catch(...) {
+ log() << "ERROR exception in Server:doWork?" << endl;
+ }
+ try {
+ f();
+ if( rq ) {
+ rq = false;
+ {
+ boost::mutex::scoped_lock lk(m);
+ d.push_back(f);
+ }
+ }
+ } catch(std::exception& e) {
+ log() << "Server::doWork() exception " << e.what() << endl;
+ } catch(...) {
+ log() << "Server::doWork() unknown exception!" << endl;
+ }
+ }
+ }
+
+ static Server *s;
+ static void abc(int i) {
+ cout << "Hello " << i << endl;
+ s->requeue();
+ }
+ class TaskUnitTest : public mongo::UnitTest {
+ public:
+ virtual void run() {
+ lam f = boost::bind(abc, 3);
+ //f();
+
+ s = new Server("unittest");
+ fork(s);
+ s->send(f);
+
+ sleepsecs(30);
+ cout <<" done" << endl;
+
+ }
+ }; // not running. taskunittest;
+
+ }
+}
diff --git a/util/concurrency/task.h b/util/concurrency/task.h
new file mode 100644
index 0000000..b3a2ece
--- /dev/null
+++ b/util/concurrency/task.h
@@ -0,0 +1,72 @@
+// @file task.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../background.h"
+
+namespace mongo {
+
+ namespace task {
+
+ /** abstraction around threads. simpler than BackgroundJob which is used behind the scenes.
+ allocate the Task dynamically. when the thread terminates, the Task object will delete itself.
+ */
+ class Task : private BackgroundJob {
+ protected:
+ virtual void doWork() = 0; // implement the task here.
+ virtual string name() = 0; // name the threada
+ public:
+ Task();
+
+ /** for a repeating task, stop after current invocation ends. can be called by other threads
+ as long as the Task is still in scope.
+ */
+ void halt();
+ private:
+ unsigned n, repeat;
+ friend void fork(Task* t);
+ friend void repeat(Task* t, unsigned millis);
+ virtual void run();
+ virtual void ending() { }
+ void begin();
+ };
+
+ /** run once */
+ void fork(Task *t);
+
+ /** run doWork() over and over, with a pause between runs of millis */
+ void repeat(Task *t, unsigned millis);
+
+ /*** Example ***
+ inline void sample() {
+ class Sample : public Task {
+ public:
+ int result;
+ virtual void doWork() { result = 1234; }
+ Sample() : result(0) { }
+ };
+ shared_ptr<Sample> q( new Sample() );
+ fork(q);
+ cout << q->result << endl; // could print 1234 or 0.
+ }
+ */
+
+ }
+
+}
diff --git a/util/concurrency/thread_pool.cpp b/util/concurrency/thread_pool.cpp
new file mode 100644
index 0000000..2caac1f
--- /dev/null
+++ b/util/concurrency/thread_pool.cpp
@@ -0,0 +1,138 @@
+/* threadpool.cpp
+*/
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "thread_pool.h"
+#include "mvar.h"
+
+namespace mongo{
+ namespace threadpool{
+
+ // Worker thread
+ class Worker : boost::noncopyable {
+ public:
+ explicit Worker(ThreadPool& owner)
+ : _owner(owner)
+ , _is_done(true)
+ , _thread(boost::bind(&Worker::loop, this))
+ {}
+
+ // destructor will block until current operation is completed
+ // Acts as a "join" on this thread
+ ~Worker(){
+ _task.put(Task());
+ _thread.join();
+ }
+
+ void set_task(Task& func){
+ assert(!func.empty());
+ assert(_is_done);
+ _is_done = false;
+
+ _task.put(func);
+ }
+
+ private:
+ ThreadPool& _owner;
+ MVar<Task> _task;
+ bool _is_done; // only used for error detection
+ boost::thread _thread;
+
+ void loop(){
+ while (true) {
+ Task task = _task.take();
+ if (task.empty())
+ break; // ends the thread
+
+ try {
+ task();
+ } catch (std::exception e){
+ log() << "Unhandled exception in worker thread: " << e.what() << endl;;
+ } catch (...){
+ log() << "Unhandled non-exception in worker thread" << endl;
+ }
+ _is_done = true;
+ _owner.task_done(this);
+ }
+ }
+ };
+
+ ThreadPool::ThreadPool(int nThreads)
+ : _mutex("ThreadPool"), _tasksRemaining(0)
+ , _nThreads(nThreads)
+ {
+ scoped_lock lock(_mutex);
+ while (nThreads-- > 0){
+ Worker* worker = new Worker(*this);
+ _freeWorkers.push_front(worker);
+ }
+ }
+
+ ThreadPool::~ThreadPool(){
+ join();
+
+ assert(_tasks.empty());
+
+ // O(n) but n should be small
+ assert(_freeWorkers.size() == (unsigned)_nThreads);
+
+ while(!_freeWorkers.empty()){
+ delete _freeWorkers.front();
+ _freeWorkers.pop_front();
+ }
+ }
+
+ void ThreadPool::join(){
+ scoped_lock lock(_mutex);
+ while(_tasksRemaining){
+ _condition.wait(lock.boost());
+ }
+ }
+
+ void ThreadPool::schedule(Task task){
+ scoped_lock lock(_mutex);
+
+ _tasksRemaining++;
+
+ if (!_freeWorkers.empty()){
+ _freeWorkers.front()->set_task(task);
+ _freeWorkers.pop_front();
+ }else{
+ _tasks.push_back(task);
+ }
+ }
+
+ // should only be called by a worker from the worker thread
+ void ThreadPool::task_done(Worker* worker){
+ scoped_lock lock(_mutex);
+
+ if (!_tasks.empty()){
+ worker->set_task(_tasks.front());
+ _tasks.pop_front();
+ }else{
+ _freeWorkers.push_front(worker);
+ }
+
+ _tasksRemaining--;
+
+ if(_tasksRemaining == 0)
+ _condition.notify_all();
+ }
+
+ } //namespace threadpool
+} //namespace mongo
diff --git a/util/thread_pool.h b/util/concurrency/thread_pool.h
index d891d7d..f0fe8f1 100644
--- a/util/thread_pool.h
+++ b/util/concurrency/thread_pool.h
@@ -18,7 +18,7 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#undef assert
-#define assert xassert
+#define assert MONGO_assert
namespace mongo {
@@ -29,7 +29,7 @@ namespace threadpool {
// exported to the mongo namespace
class ThreadPool : boost::noncopyable{
- public:
+ public:
explicit ThreadPool(int nThreads=8);
// blocks until all tasks are complete (tasks_remaining() == 0)
@@ -41,7 +41,6 @@ namespace threadpool {
// Also, new tasks could be scheduled after this returns.
void join();
-
// task will be copied a few times so make sure it's relatively cheap
void schedule(Task task);
@@ -58,10 +57,9 @@ namespace threadpool {
template<typename F, typename A, typename B, typename C, typename D, typename E>
void schedule(F f, A a, B b, C c, D d, E e){ schedule(boost::bind(f,a,b,c,d,e)); }
-
int tasks_remaining() { return _tasksRemaining; }
- private:
+ private:
mongo::mutex _mutex;
boost::condition _condition;
diff --git a/util/concurrency/value.h b/util/concurrency/value.h
new file mode 100644
index 0000000..dabeb95
--- /dev/null
+++ b/util/concurrency/value.h
@@ -0,0 +1,85 @@
+/* @file value.h
+ concurrency helpers Atomic<T> and DiagStr
+*/
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+namespace mongo {
+
+ extern mutex _atomicMutex;
+
+ /** atomic wrapper for a value. enters a mutex on each access. must
+ be copyable.
+ */
+ template<typename T>
+ class Atomic : boost::noncopyable {
+ T val;
+ public:
+ Atomic<T>() { }
+
+ void operator=(const T& a) {
+ scoped_lock lk(_atomicMutex);
+ val = a; }
+
+ operator T() const {
+ scoped_lock lk(_atomicMutex);
+ return val; }
+
+ /** example:
+ Atomic<int> q;
+ ...
+ {
+ Atomic<int>::tran t(q);
+ if( q.ref() > 0 )
+ q.ref()--;
+ }
+ */
+ class tran : private scoped_lock {
+ Atomic<T>& _a;
+ public:
+ tran(Atomic<T>& a) : scoped_lock(_atomicMutex), _a(a) { }
+ T& ref() { return _a.val; }
+ };
+ };
+
+ /** this string COULD be mangled but with the double buffering, assuming writes
+ are infrequent, it's unlikely. thus, this is reasonable for lockless setting of
+ diagnostic strings, where their content isn't critical.
+ */
+ class DiagStr {
+ char buf1[256];
+ char buf2[256];
+ char *p;
+ public:
+ DiagStr() {
+ memset(buf1, 0, 256);
+ memset(buf2, 0, 256);
+ p = buf1;
+ }
+
+ const char * get() const { return p; }
+
+ void set(const char *s) {
+ char *q = (p==buf1) ? buf2 : buf1;
+ strncpy(q, s, 255);
+ p = q;
+ }
+ };
+
+}
diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp
new file mode 100644
index 0000000..2b46946
--- /dev/null
+++ b/util/concurrency/vars.cpp
@@ -0,0 +1,52 @@
+// vars.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "value.h"
+#include "mutex.h"
+
+namespace mongo {
+
+ mutex _atomicMutex("_atomicMutex");
+ MutexDebugger mutexDebugger;
+
+ MutexDebugger::MutexDebugger() :
+ x( *(new boost::mutex()) ), magic(0x12345678) {
+ // optional way to debug lock order
+ /*
+ a = "a_lock";
+ b = "b_lock";
+ */
+ }
+
+ void MutexDebugger::programEnding() {
+ if( logLevel>=1 && followers.size() ) {
+ std::cout << followers.size() << " mutexes in program" << endl;
+ for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) {
+ cout << i->first;
+ if( maxNest[i->first] > 1 )
+ cout << " maxNest:" << maxNest[i->first];
+ cout << '\n';
+ for( set<mid>::iterator j = i->second.begin(); j != i->second.end(); j++ )
+ cout << " " << *j << '\n';
+ }
+ cout.flush();
+ }
+ }
+
+}
diff --git a/util/debug_util.cpp b/util/debug_util.cpp
index 9c2f5dc..f0a916d 100644
--- a/util/debug_util.cpp
+++ b/util/debug_util.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "../db/cmdline.h"
#include "../db/jsobj.h"
diff --git a/util/debug_util.h b/util/debug_util.h
index 6d633c5..7686ecc 100644
--- a/util/debug_util.h
+++ b/util/debug_util.h
@@ -19,7 +19,7 @@
#ifndef _WIN32
#include <signal.h>
-#endif // ndef _WIN32
+#endif
namespace mongo {
@@ -39,42 +39,47 @@ namespace mongo {
char string[400];
} *OWS;
-// for now, running on win32 means development not production --
-// use this to log things just there.
-#if defined(_WIN32)
-#define WIN if( 1 )
-#else
-#define WIN if( 0 )
-#endif
-
#if defined(_DEBUG)
-#define DEV if( 1 )
+ enum {DEBUG_BUILD = 1};
#else
-#define DEV if( 0 )
+ enum {DEBUG_BUILD = 0};
#endif
-#define DEBUGGING if( 0 )
+#define MONGO_DEV if( DEBUG_BUILD )
+#define DEV MONGO_DEV
+
+#define MONGO_DEBUGGING if( 0 )
+#define DEBUGGING MONGO_DEBUGGING
// The following declare one unique counter per enclosing function.
// NOTE The implementation double-increments on a match, but we don't really care.
-#define SOMETIMES( occasion, howOften ) for( static unsigned occasion = 0; ++occasion % howOften == 0; )
-#define OCCASIONALLY SOMETIMES( occasionally, 16 )
-#define RARELY SOMETIMES( rarely, 128 )
-#define ONCE for( static bool undone = true; undone; undone = false )
+#define MONGO_SOMETIMES( occasion, howOften ) for( static unsigned occasion = 0; ++occasion % howOften == 0; )
+#define SOMETIMES MONGO_SOMETIMES
+
+#define MONGO_OCCASIONALLY SOMETIMES( occasionally, 16 )
+#define OCCASIONALLY MONGO_OCCASIONALLY
+
+#define MONGO_RARELY SOMETIMES( rarely, 128 )
+#define RARELY MONGO_RARELY
+
+#define MONGO_ONCE for( static bool undone = true; undone; undone = false )
+#define ONCE MONGO_ONCE
#if defined(_WIN32)
-#define strcasecmp _stricmp
+ inline int strcasecmp(const char* s1, const char* s2) {return _stricmp(s1, s2);}
#endif
// Sets SIGTRAP handler to launch GDB
// Noop unless on *NIX and compiled with _DEBUG
void setupSIGTRAPforGDB();
-#if defined(_WIN32)
- inline void breakpoint() {} //noop
-#else // defined(_WIN32)
- // code to raise a breakpoint in GDB
+ extern int tlogLevel;
+
inline void breakpoint(){
+ if ( tlogLevel < 0 )
+ return;
+#ifndef _WIN32
+ // code to raise a breakpoint in GDB
ONCE {
//prevent SIGTRAP from crashing the program if default action is specified and we are not in gdb
struct sigaction current;
@@ -83,10 +88,11 @@ namespace mongo {
signal(SIGTRAP, SIG_IGN);
}
}
-
+
raise(SIGTRAP);
+#endif
}
-#endif // defined(_WIN32)
+
// conditional breakpoint
inline void breakif(bool test){
diff --git a/util/embedded_builder.h b/util/embedded_builder.h
index d945bfb..8ca47e5 100644
--- a/util/embedded_builder.h
+++ b/util/embedded_builder.h
@@ -18,6 +18,7 @@
#pragma once
namespace mongo {
+
// utility class for assembling hierarchical objects
class EmbeddedBuilder {
public:
@@ -49,7 +50,7 @@ namespace mongo {
return;
}
prepareContext( name );
- back()->appendAs( e, name.c_str() );
+ back()->appendAs( e, name );
}
BufBuilder &subarrayStartAs( string name ) {
prepareContext( name );
diff --git a/util/file.h b/util/file.h
index 347e2d6..0302290 100644
--- a/util/file.h
+++ b/util/file.h
@@ -27,6 +27,8 @@
#include <windows.h>
#endif
+#include "text.h"
+
namespace mongo {
#ifndef __sunos__
@@ -47,7 +49,6 @@ public:
#if defined(_WIN32)
#include <io.h>
-std::wstring toWideString(const char *s);
class File : public FileInterface {
HANDLE fd;
@@ -68,9 +69,9 @@ public:
fd = INVALID_HANDLE_VALUE;
}
void open(const char *filename, bool readOnly=false ) {
- std::wstring filenamew = toWideString(filename);
fd = CreateFile(
- filenamew.c_str(), ( readOnly ? 0 : GENERIC_WRITE ) | GENERIC_READ, FILE_SHARE_READ,
+ toNativeString(filename).c_str(),
+ ( readOnly ? 0 : GENERIC_WRITE ) | GENERIC_READ, FILE_SHARE_READ,
NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
if( !is_open() ) {
out() << "CreateFile failed " << filename << endl;
@@ -118,7 +119,7 @@ class File : public FileInterface {
void err(bool ok) {
if( !ok && !_bad ) {
_bad = true;
- log() << "File I/O " << OUTPUT_ERRNO << '\n';
+ log() << "File I/O " << errnoWithDescription() << '\n';
}
}
public:
@@ -137,9 +138,11 @@ public:
#endif
void open(const char *filename, bool readOnly=false ) {
- fd = ::open(filename, O_CREAT | ( readOnly ? 0 : O_RDWR ) | O_NOATIME, S_IRUSR | S_IWUSR);
+ fd = ::open(filename,
+ O_CREAT | ( readOnly ? 0 : ( O_RDWR | O_NOATIME ) ) ,
+ S_IRUSR | S_IWUSR);
if ( fd <= 0 ) {
- out() << "couldn't open " << filename << ' ' << OUTPUT_ERRNO << endl;
+ out() << "couldn't open " << filename << ' ' << errnoWithDescription() << endl;
return;
}
_bad = false;
diff --git a/util/file_allocator.h b/util/file_allocator.h
index 93b2b1c..b0267d9 100644
--- a/util/file_allocator.h
+++ b/util/file_allocator.h
@@ -1,4 +1,4 @@
-//file_allocator.h
+// @file file_allocator.h
/* Copyright 2009 10gen Inc.
*
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-#include "../stdafx.h"
+#include "../pch.h"
#include <fcntl.h>
#include <errno.h>
-#if defined(__freebsd__)
+#if defined(__freebsd__) || defined(__openbsd__)
#include <sys/stat.h>
#endif
@@ -38,7 +38,7 @@ namespace mongo {
*/
public:
#if !defined(_WIN32)
- FileAllocator() : failed_() {}
+ FileAllocator() : pendingMutex_("FileAllocator"), failed_() {}
#endif
void start() {
#if !defined(_WIN32)
@@ -106,6 +106,53 @@ namespace mongo {
#endif
}
+ static void ensureLength( int fd , long size ){
+
+#if defined(_WIN32)
+ // we don't zero on windows
+ // TODO : we should to avoid fragmentation
+#else
+
+#if defined(__linux__)
+ int ret = posix_fallocate(fd,0,size);
+ if ( ret == 0 )
+ return;
+
+ log() << "posix_fallocate failed: " << errnoWithDescription( ret ) << " falling back" << endl;
+#endif
+
+ off_t filelen = lseek(fd, 0, SEEK_END);
+ if ( filelen < size ) {
+ if (filelen != 0) {
+ stringstream ss;
+ ss << "failure creating new datafile; lseek failed for fd " << fd << " with errno: " << errnoWithDescription();
+ massert( 10440 , ss.str(), filelen == 0 );
+ }
+ // Check for end of disk.
+ massert( 10441 , "Unable to allocate file of desired size",
+ size - 1 == lseek(fd, size - 1, SEEK_SET) );
+ massert( 10442 , "Unable to allocate file of desired size",
+ 1 == write(fd, "", 1) );
+ lseek(fd, 0, SEEK_SET);
+
+ const long z = 256 * 1024;
+ const boost::scoped_array<char> buf_holder (new char[z]);
+ char* buf = buf_holder.get();
+ memset(buf, 0, z);
+ long left = size;
+ while ( left > 0 ) {
+ long towrite = left;
+ if ( towrite > z )
+ towrite = z;
+
+ int written = write( fd , buf , towrite );
+ massert( 10443 , errnoWithPrefix("write failed" ), written > 0 );
+ left -= written;
+ }
+ }
+#endif
+ }
+
private:
#if !defined(_WIN32)
void checkFailure() {
@@ -161,42 +208,26 @@ namespace mongo {
long fd = open(name.c_str(), O_CREAT | O_RDWR | O_NOATIME, S_IRUSR | S_IWUSR);
if ( fd <= 0 ) {
stringstream ss;
- ss << "couldn't open " << name << ' ' << OUTPUT_ERRNO;
+ ss << "couldn't open " << name << ' ' << errnoWithDescription();
massert( 10439 , ss.str(), fd <= 0 );
}
#if defined(POSIX_FADV_DONTNEED)
if( posix_fadvise(fd, 0, size, POSIX_FADV_DONTNEED) ) {
- log() << "warning: posix_fadvise fails " << name << ' ' << OUTPUT_ERRNO << endl;
+ log() << "warning: posix_fadvise fails " << name << ' ' << errnoWithDescription() << endl;
}
#endif
-
+
+ Timer t;
+
/* make sure the file is the full desired length */
- off_t filelen = lseek(fd, 0, SEEK_END);
- if ( filelen < size ) {
- massert( 10440 , "failure creating new datafile", filelen == 0 );
- // Check for end of disk.
- massert( 10441 , "Unable to allocate file of desired size",
- size - 1 == lseek(fd, size - 1, SEEK_SET) );
- massert( 10442 , "Unable to allocate file of desired size",
- 1 == write(fd, "", 1) );
- lseek(fd, 0, SEEK_SET);
- Timer t;
- long z = 256 * 1024;
- char buf[z];
- memset(buf, 0, z);
- long left = size;
- while ( left > 0 ) {
- long towrite = left;
- if ( towrite > z )
- towrite = z;
-
- int written = write( fd , buf , towrite );
- massert( 10443 , errnostring("write failed" ), written > 0 );
- left -= written;
- }
- log() << "done allocating datafile " << name << ", size: " << size/1024/1024 << "MB, took " << ((double)t.millis())/1000.0 << " secs" << endl;
- }
+ ensureLength( fd , size );
+
+ log() << "done allocating datafile " << name << ", "
+ << "size: " << size/1024/1024 << "MB, "
+ << " took " << ((double)t.millis())/1000.0 << " secs"
+ << endl;
+
close( fd );
} catch ( ... ) {
diff --git a/util/goodies.h b/util/goodies.h
index cd5423b..1dfabdc 100644
--- a/util/goodies.h
+++ b/util/goodies.h
@@ -1,4 +1,4 @@
-// goodies.h
+// @file goodies.h
// miscellaneous junk
/* Copyright 2009 10gen Inc.
@@ -18,13 +18,22 @@
#pragma once
-#if defined(_WIN32)
-# include <windows.h>
-#endif
+#include "../bson/util/misc.h"
+#include "concurrency/mutex.h"
namespace mongo {
-#if !defined(_WIN32) && !defined(NOEXECINFO) && !defined(__freebsd__) && !defined(__sun__)
+ void setThreadName(const char * name);
+ string getThreadName();
+
+ template<class T>
+ inline string ToString(const T& t) {
+ stringstream s;
+ s << t;
+ return s.str();
+ }
+
+#if !defined(_WIN32) && !defined(NOEXECINFO) && !defined(__freebsd__) && !defined(__openbsd__) && !defined(__sun__)
} // namespace mongo
@@ -105,15 +114,14 @@ namespace mongo {
}
// PRINT(2+2); prints "2+2: 4"
-#define PRINT(x) cout << #x ": " << (x) << endl
+#define MONGO_PRINT(x) cout << #x ": " << (x) << endl
+#define PRINT MONGO_PRINT
// PRINTFL; prints file:line
-#define PRINTFL cout << __FILE__ ":" << __LINE__ << endl
-
-#undef yassert
+#define MONGO_PRINTFL cout << __FILE__ ":" << __LINE__ << endl
+#define PRINTFL MONGO_PRINTFL
#undef assert
-#define assert xassert
-#define yassert 1
+#define assert MONGO_assert
struct WrappingInt {
WrappingInt() {
@@ -139,22 +147,6 @@ namespace mongo {
}
};
-} // namespace mongo
-
-#include <ctime>
-
-namespace mongo {
-
- inline void time_t_to_String(time_t t, char *buf) {
-#if defined(_WIN32)
- ctime_s(buf, 64, &t);
-#else
- ctime_r(&t, buf);
-#endif
- buf[24] = 0; // don't want the \n
- }
-
-
inline void time_t_to_Struct(time_t t, struct tm * buf , bool local = false ) {
#if defined(_WIN32)
if ( local )
@@ -169,12 +161,26 @@ namespace mongo {
#endif
}
+ // uses ISO 8601 dates without trailing Z
+ // colonsOk should be false when creating filenames
+ inline string terseCurrentTime(bool colonsOk=true){
+ struct tm t;
+ time_t_to_Struct( time(0) , &t );
+ const char* fmt = (colonsOk ? "%Y-%m-%dT%H:%M:%S" : "%Y-%m-%dT%H-%M-%S");
+ char buf[32];
+ assert(strftime(buf, sizeof(buf), fmt, &t) == 19);
+ return buf;
+ }
-#define asctime _asctime_not_threadsafe_
-#define gmtime _gmtime_not_threadsafe_
-#define localtime _localtime_not_threadsafe_
-#define ctime _ctime_is_not_threadsafe_
+#define MONGO_asctime _asctime_not_threadsafe_
+#define asctime MONGO_asctime
+#define MONGO_gmtime _gmtime_not_threadsafe_
+#define gmtime MONGO_gmtime
+#define MONGO_localtime _localtime_not_threadsafe_
+#define localtime MONGO_localtime
+#define MONGO_ctime _ctime_is_not_threadsafe_
+#define ctime MONGO_ctime
#if defined(_WIN32) || defined(__sunos__)
inline void sleepsecs(int s) {
@@ -183,24 +189,24 @@ namespace mongo {
xt.sec += s;
boost::thread::sleep(xt);
}
- inline void sleepmillis(int s) {
+ inline void sleepmillis(long long s) {
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
- xt.sec += ( s / 1000 );
- xt.nsec += ( s % 1000 ) * 1000000;
+ xt.sec += (int)( s / 1000 );
+ xt.nsec += (int)(( s % 1000 ) * 1000000);
if ( xt.nsec >= 1000000000 ) {
xt.nsec -= 1000000000;
xt.sec++;
}
boost::thread::sleep(xt);
}
- inline void sleepmicros(int s) {
+ inline void sleepmicros(long long s) {
if ( s <= 0 )
return;
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
- xt.sec += ( s / 1000000 );
- xt.nsec += ( s % 1000000 ) * 1000;
+ xt.sec += (int)( s / 1000000 );
+ xt.nsec += (int)(( s % 1000000 ) * 1000);
if ( xt.nsec >= 1000000000 ) {
xt.nsec -= 1000000000;
xt.sec++;
@@ -216,22 +222,23 @@ namespace mongo {
cout << "nanosleep failed" << endl;
}
}
- inline void sleepmicros(int s) {
+ inline void sleepmicros(long long s) {
if ( s <= 0 )
return;
struct timespec t;
t.tv_sec = (int)(s / 1000000);
- t.tv_nsec = s % 1000000;
- if ( nanosleep( &t , 0 ) ){
+ t.tv_nsec = 1000 * ( s % 1000000 );
+ struct timespec out;
+ if ( nanosleep( &t , &out ) ){
cout << "nanosleep failed" << endl;
}
}
- inline void sleepmillis(int s) {
+ inline void sleepmillis(long long s) {
sleepmicros( s * 1000 );
}
#endif
-// note this wraps
+ // note this wraps
inline int tdiff(unsigned told, unsigned tnew) {
return WrappingInt::diff(tnew, told);
}
@@ -242,15 +249,6 @@ namespace mongo {
return (xt.sec & 0xfffff) * 1000 + t;
}
- struct Date_t {
- // TODO: make signed (and look for related TODO's)
- unsigned long long millis;
- Date_t(): millis(0) {}
- Date_t(unsigned long long m): millis(m) {}
- operator unsigned long long&() { return millis; }
- operator const unsigned long long&() const { return millis; }
- };
-
inline Date_t jsTime() {
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
@@ -273,44 +271,7 @@ namespace mongo {
unsigned secs = xt.sec % 1024;
return secs*1000000 + t;
}
- using namespace boost;
-
- extern bool __destroyingStatics;
-
- // If you create a local static instance of this class, that instance will be destroyed
- // before all global static objects are destroyed, so __destroyingStatics will be set
- // to true before the global static variables are destroyed.
- class StaticObserver : boost::noncopyable {
- public:
- ~StaticObserver() { __destroyingStatics = true; }
- };
-
- // On pthread systems, it is an error to destroy a mutex while held. Static global
- // mutexes may be held upon shutdown in our implementation, and this way we avoid
- // destroying them.
- class mutex : boost::noncopyable {
- public:
- mutex() { new (_buf) boost::mutex(); }
- ~mutex() {
- if( !__destroyingStatics ) {
- boost().boost::mutex::~mutex();
- }
- }
- class scoped_lock : boost::noncopyable {
- public:
- scoped_lock( mongo::mutex &m ) : _l( m.boost() ) {}
- boost::mutex::scoped_lock &boost() { return _l; }
- private:
- boost::mutex::scoped_lock _l;
- };
- private:
- boost::mutex &boost() { return *( boost::mutex * )( _buf ); }
- char _buf[ sizeof( boost::mutex ) ];
- };
- typedef mongo::mutex::scoped_lock scoped_lock;
- typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock;
-
// simple scoped timer
class Timer {
public:
@@ -320,17 +281,17 @@ namespace mongo {
Timer( unsigned long long start ) {
old = start;
}
- int seconds(){
+ int seconds() const {
return (int)(micros() / 1000000);
}
- int millis() {
+ int millis() const {
return (long)(micros() / 1000);
}
- unsigned long long micros() {
+ unsigned long long micros() const {
unsigned long long n = curTimeMicros64();
return n - old;
}
- unsigned long long micros(unsigned long long & n) { // returns cur time in addition to timer result
+ unsigned long long micros(unsigned long long & n) const { // returns cur time in addition to timer result
n = curTimeMicros64();
return n - old;
}
@@ -364,6 +325,7 @@ namespace mongo {
if ( strlen(str) < l ) return false;
return strncmp(str, prefix, l) == 0;
}
+ inline bool startsWith(string s, string p) { return startsWith(s.c_str(), p.c_str()); }
inline bool endsWith(const char *p, const char *suffix) {
size_t a = strlen(p);
@@ -372,12 +334,6 @@ namespace mongo {
return strcmp(p + a - b, suffix) == 0;
}
-} // namespace mongo
-
-#include "boost/detail/endian.hpp"
-
-namespace mongo {
-
inline unsigned long swapEndian(unsigned long x) {
return
((x & 0xff) << 24) |
@@ -395,15 +351,6 @@ namespace mongo {
return swapEndian(x);
}
#endif
-
- // Like strlen, but only scans up to n bytes.
- // Returns -1 if no '0' found.
- inline int strnlen( const char *s, int n ) {
- for( int i = 0; i < n; ++i )
- if ( !s[ i ] )
- return i;
- return -1;
- }
#if !defined(_WIN32)
typedef int HANDLE;
@@ -446,7 +393,7 @@ namespace mongo {
boost::thread_specific_ptr<T> _val;
};
- class ProgressMeter {
+ class ProgressMeter : boost::noncopyable {
public:
ProgressMeter( long long total , int secondsBetween = 3 , int checkInterval = 100 ){
reset( total , secondsBetween , checkInterval );
@@ -513,6 +460,10 @@ namespace mongo {
buf << _done << "/" << _total << " " << (_done*100)/_total << "%";
return buf.str();
}
+
+ bool operator==( const ProgressMeter& other ) const {
+ return this == &other;
+ }
private:
bool _active;
@@ -526,9 +477,39 @@ namespace mongo {
int _lastTime;
};
+ class ProgressMeterHolder : boost::noncopyable {
+ public:
+ ProgressMeterHolder( ProgressMeter& pm )
+ : _pm( pm ){
+ }
+
+ ~ProgressMeterHolder(){
+ _pm.finished();
+ }
+
+ ProgressMeter* operator->(){
+ return &_pm;
+ }
+
+ bool hit( int n = 1 ){
+ return _pm.hit( n );
+ }
+
+ void finished(){
+ _pm.finished();
+ }
+
+ bool operator==( const ProgressMeter& other ){
+ return _pm == other;
+ }
+
+ private:
+ ProgressMeter& _pm;
+ };
+
class TicketHolder {
public:
- TicketHolder( int num ){
+ TicketHolder( int num ) : _mutex("TicketHolder") {
_outof = num;
_num = num;
}
@@ -611,7 +592,7 @@ namespace mongo {
_buf = 0;
}
- operator string() const {
+ string toString() const {
string s = _buf;
return s;
}
@@ -634,11 +615,11 @@ namespace mongo {
}
bool operator!=( const char * str ) const {
- return strcmp( _buf , str );
+ return strcmp( _buf , str ) != 0;
}
bool empty() const {
- return _buf[0] == 0;
+ return _buf == 0 || _buf[0] == 0;
}
private:
@@ -651,6 +632,20 @@ namespace mongo {
inline bool isNumber( char c ) {
return c >= '0' && c <= '9';
}
+
+ inline unsigned stringToNum(const char *str) {
+ unsigned x = 0;
+ const char *p = str;
+ while( 1 ) {
+ if( !isNumber(*p) ) {
+ if( *p == 0 && p != str )
+ break;
+ throw 0;
+ }
+ x = x * 10 + *p++ - '0';
+ }
+ return x;
+ }
// for convenience, '{' is greater than anything and stops number parsing
inline int lexNumCmp( const char *s1, const char *s2 ) {
@@ -703,5 +698,46 @@ namespace mongo {
return -1;
return 0;
}
-
+
+ /** A generic pointer type for function arguments.
+ * It will convert from any pointer type except auto_ptr.
+ * Semantics are the same as passing the pointer returned from get()
+ * const ptr<T> => T * const
+ * ptr<const T> => T const * or const T*
+ */
+ template <typename T>
+ struct ptr{
+
+ ptr() : _p(NULL) {}
+
+ // convert to ptr<T>
+ ptr(T* p) : _p(p) {} // needed for NULL
+ template<typename U> ptr(U* p) : _p(p) {}
+ template<typename U> ptr(const ptr<U>& p) : _p(p) {}
+ template<typename U> ptr(const boost::shared_ptr<U>& p) : _p(p.get()) {}
+ template<typename U> ptr(const boost::scoped_ptr<U>& p) : _p(p.get()) {}
+ //template<typename U> ptr(const auto_ptr<U>& p) : _p(p.get()) {}
+
+ // assign to ptr<T>
+ ptr& operator= (T* p) { _p = p; return *this; } // needed for NULL
+ template<typename U> ptr& operator= (U* p) { _p = p; return *this; }
+ template<typename U> ptr& operator= (const ptr<U>& p) { _p = p; return *this; }
+ template<typename U> ptr& operator= (const boost::shared_ptr<U>& p) { _p = p.get(); return *this; }
+ template<typename U> ptr& operator= (const boost::scoped_ptr<U>& p) { _p = p.get(); return *this; }
+ //template<typename U> ptr& operator= (const auto_ptr<U>& p) { _p = p.get(); return *this; }
+
+ // use
+ T* operator->() const { return _p; }
+ T& operator*() const { return *_p; }
+
+ // convert from ptr<T>
+ operator T* () const { return _p; }
+
+ private:
+ T* _p;
+ };
+
+ /** Hmmmm */
+ using namespace boost;
+
} // namespace mongo
diff --git a/util/hashtab.h b/util/hashtab.h
index d46591c..16c5483 100644
--- a/util/hashtab.h
+++ b/util/hashtab.h
@@ -22,7 +22,7 @@
#pragma once
-#include "../stdafx.h"
+#include "../pch.h"
#include <map>
namespace mongo {
@@ -36,7 +36,8 @@ namespace mongo {
template <
class Key,
- class Type
+ class Type,
+ class PTR
>
class HashTable : boost::noncopyable {
public:
@@ -51,10 +52,15 @@ namespace mongo {
void setUnused() {
hash = 0;
}
- } *nodes;
+ };
+ PTR _buf;
int n;
int maxChain;
+ Node& nodes(int i) {
+ return *((Node*) _buf.at(i * sizeof(Node), sizeof(Node)));
+ }
+
int _find(const Key& k, bool& found) {
found = false;
int h = k.hash();
@@ -63,12 +69,12 @@ namespace mongo {
int chain = 0;
int firstNonUsed = -1;
while ( 1 ) {
- if ( !nodes[i].inUse() ) {
+ if ( !nodes(i).inUse() ) {
if ( firstNonUsed < 0 )
firstNonUsed = i;
}
- if ( nodes[i].hash == h && nodes[i].k == k ) {
+ if ( nodes(i).hash == h && nodes(i).k == k ) {
if ( chain >= 200 )
out() << "warning: hashtable " << name << " long chain " << endl;
found = true;
@@ -92,24 +98,28 @@ namespace mongo {
public:
/* buf must be all zeroes on initialization. */
- HashTable(void *buf, int buflen, const char *_name) : name(_name) {
+ HashTable(PTR buf, int buflen, const char *_name) : name(_name) {
int m = sizeof(Node);
// out() << "hashtab init, buflen:" << buflen << " m:" << m << endl;
n = buflen / m;
if ( (n & 1) == 0 )
n--;
maxChain = (int) (n * 0.05);
- nodes = (Node *) buf;
+ _buf = buf;
+ //nodes = (Node *) buf;
+
+ if ( sizeof(Node) != 628 ){
+ out() << "HashTable() " << _name << " sizeof(node):" << sizeof(Node) << " n:" << n << " sizeof(Key): " << sizeof(Key) << " sizeof(Type):" << sizeof(Type) << endl;
+ assert( sizeof(Node) == 628 );
+ }
- assert( sizeof(Node) == 628 );
- //out() << "HashTable() " << _name << " sizeof(node):" << sizeof(Node) << " n:" << n << endl;
}
Type* get(const Key& k) {
bool found;
int i = _find(k, found);
if ( found )
- return &nodes[i].value;
+ return &nodes(i).value;
return 0;
}
@@ -117,8 +127,9 @@ namespace mongo {
bool found;
int i = _find(k, found);
if ( i >= 0 && found ) {
- nodes[i].k.kill();
- nodes[i].setUnused();
+ Node& n = nodes(i);
+ n.k.kill();
+ n.setUnused();
}
}
/*
@@ -136,24 +147,34 @@ namespace mongo {
int i = _find(k, found);
if ( i < 0 )
return false;
+ Node& n = nodes(i);
if ( !found ) {
- nodes[i].k = k;
- nodes[i].hash = k.hash();
+ n.k = k;
+ n.hash = k.hash();
}
else {
- assert( nodes[i].hash == k.hash() );
+ assert( n.hash == k.hash() );
}
- nodes[i].value = value;
+ n.value = value;
return true;
}
typedef void (*IteratorCallback)( const Key& k , Type& v );
-
void iterAll( IteratorCallback callback ){
for ( int i=0; i<n; i++ ){
- if ( ! nodes[i].inUse() )
+ if ( ! nodes(i).inUse() )
+ continue;
+ callback( nodes(i).k , nodes(i).value );
+ }
+ }
+
+ // TODO: should probably use boost::bind for this, but didn't want to look at it
+ typedef void (*IteratorCallback2)( const Key& k , Type& v , void * extra );
+ void iterAll( IteratorCallback2 callback , void * extra ){
+ for ( int i=0; i<n; i++ ){
+ if ( ! nodes(i).inUse() )
continue;
- callback( nodes[i].k , nodes[i].value );
+ callback( nodes(i).k , nodes(i).value , extra );
}
}
diff --git a/util/hex.h b/util/hex.h
index cef3e80..45a08f4 100644
--- a/util/hex.h
+++ b/util/hex.h
@@ -32,4 +32,36 @@ namespace mongo {
inline char fromHex( const char *c ) {
return ( fromHex( c[ 0 ] ) << 4 ) | fromHex( c[ 1 ] );
}
+
+ inline string toHex(const void* inRaw, int len){
+ static const char hexchars[] = "0123456789ABCDEF";
+
+ StringBuilder out;
+ const char* in = reinterpret_cast<const char*>(inRaw);
+ for (int i=0; i<len; ++i){
+ char c = in[i];
+ char hi = hexchars[(c & 0xF0) >> 4];
+ char lo = hexchars[(c & 0x0F)];
+
+ out << hi << lo;
+ }
+
+ return out.str();
+ }
+
+ inline string toHexLower(const void* inRaw, int len){
+ static const char hexchars[] = "0123456789abcdef";
+
+ StringBuilder out;
+ const char* in = reinterpret_cast<const char*>(inRaw);
+ for (int i=0; i<len; ++i){
+ char c = in[i];
+ char hi = hexchars[(c & 0xF0) >> 4];
+ char lo = hexchars[(c & 0x0F)];
+
+ out << hi << lo;
+ }
+
+ return out.str();
+ }
}
diff --git a/util/histogram.cpp b/util/histogram.cpp
new file mode 100644
index 0000000..4541dfd
--- /dev/null
+++ b/util/histogram.cpp
@@ -0,0 +1,129 @@
+// histogram.cc
+
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <iomanip>
+#include <limits>
+#include <sstream>
+
+#include "histogram.h"
+
+namespace mongo {
+
+ using std::ostringstream;
+ using std::setfill;
+ using std::setw;
+
+ Histogram::Histogram( const Options& opts )
+ : _initialValue( opts.initialValue )
+ , _numBuckets( opts.numBuckets )
+ , _boundaries( new uint32_t[_numBuckets] )
+ , _buckets( new uint64_t[_numBuckets] ){
+
+ // TODO more sanity checks
+ // + not too few buckets
+ // + initialBucket and bucketSize fit within 32 bit ints
+
+ // _boundaries store the maximum value falling in that bucket.
+ if ( opts.exponential ){
+ uint32_t twoPow = 1; // 2^0
+ for ( uint32_t i = 0; i < _numBuckets - 1; i++){
+ _boundaries[i] = _initialValue + opts.bucketSize * twoPow;
+ twoPow *= 2; // 2^i+1
+ }
+ } else {
+ _boundaries[0] = _initialValue + opts.bucketSize;
+ for ( uint32_t i = 1; i < _numBuckets - 1; i++ ){
+ _boundaries[i] = _boundaries[ i-1 ] + opts.bucketSize;
+ }
+ }
+ _boundaries[ _numBuckets-1 ] = std::numeric_limits<uint32_t>::max();
+
+ for ( uint32_t i = 0; i < _numBuckets; i++ ) {
+ _buckets[i] = 0;
+ }
+ }
+
+ Histogram::~Histogram() {
+ delete [] _boundaries;
+ delete [] _buckets;
+ }
+
+ void Histogram::insert( uint32_t element ){
+ if ( element < _initialValue) return;
+
+ _buckets[ _findBucket(element) ] += 1;
+ }
+
+ string Histogram::toHTML() const{
+ uint64_t max = 0;
+ for ( uint32_t i = 0; i < _numBuckets; i++ ){
+ if ( _buckets[i] > max ){
+ max = _buckets[i];
+ }
+ }
+ if ( max == 0 ) {
+ return "histogram is empty\n";
+ }
+
+ // normalize buckets to max
+ const int maxBar = 20;
+ ostringstream ss;
+ for ( uint32_t i = 0; i < _numBuckets; i++ ){
+ int barSize = _buckets[i] * maxBar / max;
+ ss << string( barSize,'*' )
+ << setfill(' ') << setw( maxBar-barSize + 12 )
+ << _boundaries[i] << '\n';
+ }
+
+ return ss.str();
+ }
+
+ uint64_t Histogram::getCount( uint32_t bucket ) const {
+ if ( bucket >= _numBuckets ) return 0;
+
+ return _buckets[ bucket ];
+ }
+
+ uint32_t Histogram::getBoundary( uint32_t bucket ) const {
+ if ( bucket >= _numBuckets ) return 0;
+
+ return _boundaries[ bucket ];
+ }
+
+ uint32_t Histogram::getBucketsNum() const {
+ return _numBuckets;
+ }
+
+ uint32_t Histogram::_findBucket( uint32_t element ) const{
+ // TODO assert not too small a value?
+
+ uint32_t low = 0;
+ uint32_t high = _numBuckets - 1;
+ while ( low < high ){
+ // low + ( (high - low) / 2 );
+ uint32_t mid = ( low + high ) >> 1;
+ if ( element > _boundaries[ mid ] ){
+ low = mid + 1;
+ } else {
+ high = mid;
+ }
+ }
+ return low;
+ }
+
+} // namespace mongo
diff --git a/util/histogram.h b/util/histogram.h
new file mode 100644
index 0000000..d4a6fa7
--- /dev/null
+++ b/util/histogram.h
@@ -0,0 +1,128 @@
+// histogram.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef UTIL_HISTOGRAM_HEADER
+#define UTIL_HISTOGRAM_HEADER
+
+#include "../pch.h"
+
+#include <string>
+
+namespace mongo {
+
+ using std::string;
+
+ /**
+ * A histogram for a 32-bit integer range.
+ */
+ class Histogram {
+ public:
+ /**
+ * Construct a histogram with 'numBuckets' buckets, optionally
+ * having the first bucket start at 'initialValue' rather than
+ * 0. By default, the histogram buckets will be 'bucketSize' wide.
+ *
+ * Usage example:
+ * Histogram::Options opts;
+ * opts.numBuckets = 3;
+ * opts.bucketSize = 10;
+ * Histogram h( opts );
+ *
+ * Generates the bucket ranges [0..10],[11..20],[21..max_int]
+ *
+ * Alternatively, the flag 'exponential' could be turned on, in
+ * which case a bucket's maximum value will be
+ * initialValue + bucketSize * 2 ^ [0..numBuckets-1]
+ *
+ * Usage example:
+ * Histogram::Options opts;
+ * opts.numBuckets = 4;
+ * opts.bucketSize = 125;
+ * opts.exponential = true;
+ * Histogram h( opts );
+ *
+ * Generates the bucket ranges [0..125],[126..250],[251..500],[501..max_int]
+ */
+ struct Options {
+ boost::uint32_t numBuckets;
+ boost::uint32_t bucketSize;
+ boost::uint32_t initialValue;
+
+ // use exponential buckets?
+ bool exponential;
+
+ Options()
+ : numBuckets(0)
+ , bucketSize(0)
+ , initialValue(0)
+ , exponential(false){}
+ };
+ explicit Histogram( const Options& opts );
+ ~Histogram();
+
+ /**
+ * Find the bucket that 'element' falls into and increment its count.
+ */
+ void insert( boost::uint32_t element );
+
+ /**
+ * Render the histogram as string that can be used inside an
+ * HTML doc.
+ */
+ string toHTML() const;
+
+ // testing interface below -- consider it private
+
+ /**
+ * Return the count for the 'bucket'-th bucket.
+ */
+ boost::uint64_t getCount( boost::uint32_t bucket ) const;
+
+ /**
+ * Return the maximum element that would fall in the
+ * 'bucket'-th bucket.
+ */
+ boost::uint32_t getBoundary( boost::uint32_t bucket ) const;
+
+ /**
+ * Return the number of buckets in this histogram.
+ */
+ boost::uint32_t getBucketsNum() const;
+
+ private:
+ /**
+ * Returns the bucket where 'element' should fall
+ * into. Currently assumes that 'element' is greater than the
+ * minimum 'inialValue'.
+ */
+ boost::uint32_t _findBucket( boost::uint32_t element ) const;
+
+ boost::uint32_t _initialValue; // no value lower than it is recorded
+ boost::uint32_t _numBuckets; // total buckets in the histogram
+
+ // all below owned here
+ boost::uint32_t* _boundaries; // maximum element of each bucket
+ boost::uint64_t* _buckets; // current count of each bucket
+
+ Histogram( const Histogram& );
+ Histogram& operator=( const Histogram& );
+ };
+
+} // namespace mongo
+
+#endif // UTIL_HISTOGRAM_HEADER
diff --git a/util/hostandport.h b/util/hostandport.h
new file mode 100644
index 0000000..6124570
--- /dev/null
+++ b/util/hostandport.h
@@ -0,0 +1,142 @@
+// hostandport.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "sock.h"
+#include "../db/cmdline.h"
+#include "mongoutils/str.h"
+
+namespace mongo {
+
+ using namespace mongoutils;
+
+ /** helper for manipulating host:port connection endpoints.
+ */
+ struct HostAndPort {
+ HostAndPort() : _port(-1) { }
+
+ /** From a string hostname[:portnumber]
+ Throws user assertion if bad config string or bad port #.
+ */
+ HostAndPort(string s);
+
+ /** @param p port number. -1 is ok to use default. */
+ HostAndPort(string h, int p /*= -1*/) : _host(h), _port(p) { }
+
+ HostAndPort(const SockAddr& sock )
+ : _host( sock.getAddr() ) , _port( sock.getPort() ){
+ }
+
+ static HostAndPort me() {
+ return HostAndPort("localhost", cmdLine.port);
+ }
+
+ /* uses real hostname instead of localhost */
+ static HostAndPort Me();
+
+ bool operator<(const HostAndPort& r) const {
+ if( _host < r._host )
+ return true;
+ if( _host == r._host )
+ return port() < r.port();
+ return false;
+ }
+
+ bool operator==(const HostAndPort& r) const {
+ return _host == r._host && port() == r.port();
+ }
+
+ /* returns true if the host/port combo identifies this process instance. */
+ bool isSelf() const; // defined in message.cpp
+
+ bool isLocalHost() const;
+
+ // @returns host:port
+ string toString() const;
+
+ operator string() const { return toString(); }
+
+ string host() const { return _host; }
+
+ int port() const { return _port >= 0 ? _port : CmdLine::DefaultDBPort; }
+ bool hasPort() const { return _port >= 0; }
+ void setPort( int port ) { _port = port; }
+
+ private:
+ // invariant (except full obj assignment):
+ string _host;
+ int _port; // -1 indicates unspecified
+ };
+
+ /** returns true if strings seem to be the same hostname.
+ "nyc1" and "nyc1.acme.com" are treated as the same.
+ in fact "nyc1.foo.com" and "nyc1.acme.com" are treated the same -
+ we oly look up to the first period.
+ */
+ inline bool sameHostname(const string& a, const string& b) {
+ return str::before(a, '.') == str::before(b, '.');
+ }
+
+ inline HostAndPort HostAndPort::Me() {
+ string h = getHostName();
+ assert( !h.empty() );
+ assert( h != "localhost" );
+ return HostAndPort(h, cmdLine.port);
+ }
+
+ inline string HostAndPort::toString() const {
+ stringstream ss;
+ ss << _host;
+ if ( _port != -1 ){
+ ss << ':';
+#if defined(_DEBUG)
+ if( _port >= 44000 && _port < 44100 ) {
+ log() << "warning: special debug port 44xxx used" << endl;
+ ss << _port+1;
+ }
+ else
+ ss << _port;
+#else
+ ss << _port;
+#endif
+ }
+ return ss.str();
+ }
+
+ inline bool HostAndPort::isLocalHost() const {
+ return _host == "localhost" || startsWith(_host.c_str(), "127.") || _host == "::1";
+ }
+
+ inline HostAndPort::HostAndPort(string s) {
+ const char *p = s.c_str();
+ uassert(13110, "HostAndPort: bad config string", *p);
+ const char *colon = strrchr(p, ':');
+ if( colon ) {
+ int port = atoi(colon+1);
+ uassert(13095, "HostAndPort: bad port #", port > 0);
+ _host = string(p,colon-p);
+ _port = port;
+ }
+ else {
+ // no port specified.
+ _host = p;
+ _port = -1;
+ }
+ }
+
+}
diff --git a/util/httpclient.cpp b/util/httpclient.cpp
index 08b6220..4f78029 100644
--- a/util/httpclient.cpp
+++ b/util/httpclient.cpp
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "httpclient.h"
#include "sock.h"
#include "message.h"
-#include "builder.h"
+#include "../bson/util/builder.h"
namespace mongo {
@@ -94,15 +94,11 @@ namespace mongo {
{
const char * out = req.c_str();
int toSend = req.size();
- while ( toSend ){
- int did = p.send( out , toSend );
- toSend -= did;
- out += did;
- }
+ p.send( out , toSend, "_go" );
}
char buf[4096];
- int got = p.recv( buf , 4096 );
+ int got = p.unsafe_recv( buf , 4096 );
buf[got] = 0;
int rc;
@@ -114,19 +110,41 @@ namespace mongo {
if ( result )
sb << buf;
- while ( ( got = p.recv( buf , 4096 ) ) > 0){
+ while ( ( got = p.unsafe_recv( buf , 4096 ) ) > 0){
if ( result )
sb << buf;
}
if ( result ){
- result->_code = rc;
- result->_entireResponse = sb.str();
+ result->_init( rc , sb.str() );
}
return rc;
}
+ void HttpClient::Result::_init( int code , string entire ){
+ _code = code;
+ _entireResponse = entire;
+
+ while ( true ){
+ size_t i = entire.find( '\n' );
+ if ( i == string::npos ){
+ // invalid
+ break;
+ }
+
+ string h = entire.substr( 0 , i );
+ entire = entire.substr( i + 1 );
+
+ if ( h.size() && h[h.size()-1] == '\r' )
+ h = h.substr( 0 , h.size() - 1 );
+
+ if ( h.size() == 0 )
+ break;
+ }
+
+ _body = entire;
+ }
}
diff --git a/util/httpclient.h b/util/httpclient.h
index ef3e147..8b9da97 100644
--- a/util/httpclient.h
+++ b/util/httpclient.h
@@ -17,7 +17,7 @@
#pragma once
-#include "../stdafx.h"
+#include "../pch.h"
namespace mongo {
@@ -31,9 +31,25 @@ namespace mongo {
const string& getEntireResponse() const {
return _entireResponse;
}
+
+ const map<string,string> getHeaders() const {
+ return _headers;
+ }
+
+ const string& getBody() const {
+ return _body;
+ }
+
private:
+
+ void _init( int code , string entire );
+
int _code;
string _entireResponse;
+
+ map<string,string> _headers;
+ string _body;
+
friend class HttpClient;
};
diff --git a/util/log.cpp b/util/log.cpp
new file mode 100644
index 0000000..bfd9154
--- /dev/null
+++ b/util/log.cpp
@@ -0,0 +1,127 @@
+/** @file log.cpp
+ */
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "assert_util.h"
+#include "assert.h"
+#include "file.h"
+#include <cmath>
+using namespace std;
+
+#ifndef _WIN32
+#include <cxxabi.h>
+#include <sys/file.h>
+#endif
+
+#include "../db/jsobj.h"
+
+namespace mongo {
+
+ Nullstream nullstream;
+ vector<Tee*>* Logstream::globalTees = 0;
+
+ thread_specific_ptr<Logstream> Logstream::tsp;
+
+ class LoggingManager {
+ public:
+ LoggingManager()
+ : _enabled(0) , _file(0) {
+ }
+
+ void start( const string& lp , bool append ){
+ uassert( 10268 , "LoggingManager already started" , ! _enabled );
+ _append = append;
+
+ // test path
+ FILE * test = fopen( lp.c_str() , _append ? "a" : "w" );
+ if ( ! test ){
+ cout << "can't open [" << lp << "] for log file: " << errnoWithDescription() << endl;
+ dbexit( EXIT_BADOPTIONS );
+ assert( 0 );
+ }
+ fclose( test );
+
+ _path = lp;
+ _enabled = 1;
+ rotate();
+ }
+
+ void rotate(){
+ if ( ! _enabled ){
+ cout << "LoggingManager not enabled" << endl;
+ return;
+ }
+
+ if ( _file ){
+#ifdef _WIN32
+ cout << "log rotation doesn't work on windows" << endl;
+ return;
+#else
+ struct tm t;
+ localtime_r( &_opened , &t );
+
+ stringstream ss;
+ ss << _path << "." << terseCurrentTime(false);
+ string s = ss.str();
+ rename( _path.c_str() , s.c_str() );
+#endif
+ }
+
+
+ FILE* tmp = fopen(_path.c_str(), (_append ? "a" : "w"));
+ if (!tmp){
+ cerr << "can't open: " << _path.c_str() << " for log file" << endl;
+ dbexit( EXIT_BADOPTIONS );
+ assert(0);
+ }
+
+ Logstream::setLogFile(tmp); // after this point no thread will be using old file
+
+ if (_file){
+ fclose(_file);
+ }
+
+ _file = tmp;
+ _opened = time(0);
+ }
+
+ private:
+
+ bool _enabled;
+ string _path;
+ bool _append;
+
+ FILE * _file;
+ time_t _opened;
+
+ } loggingManager;
+
+ void initLogging( const string& lp , bool append ){
+ cout << "all output going to: " << lp << endl;
+ loggingManager.start( lp , append );
+ }
+
+ void rotateLogs( int signal ){
+ loggingManager.rotate();
+ }
+
+ // done *before* static initialization
+ FILE* Logstream::logfile = stdout;
+
+}
+
diff --git a/util/log.h b/util/log.h
index 668557a..152fb80 100644
--- a/util/log.h
+++ b/util/log.h
@@ -1,4 +1,4 @@
-// log.h
+// @file log.h
/* Copyright 2009 10gen Inc.
*
@@ -19,12 +19,33 @@
#include <string.h>
#include <errno.h>
+#include "../bson/util/builder.h"
-namespace mongo {
+#ifndef _WIN32
+//#include <syslog.h>
+#endif
- using boost::shared_ptr;
+namespace mongo {
- // Utility interface for stringifying object only when val() called.
+ enum LogLevel { LL_DEBUG , LL_INFO , LL_NOTICE , LL_WARNING , LL_ERROR , LL_SEVERE };
+
+ inline const char * logLevelToString( LogLevel l ){
+ switch ( l ){
+ case LL_DEBUG:
+ case LL_INFO:
+ case LL_NOTICE:
+ return "";
+ case LL_WARNING:
+ return "warning" ;
+ case LL_ERROR:
+ return "ERROR";
+ case LL_SEVERE:
+ return "SEVERE";
+ default:
+ return "UNKNOWN";
+ }
+ }
+
class LazyString {
public:
virtual ~LazyString() {}
@@ -36,17 +57,29 @@ namespace mongo {
class LazyStringImpl : public LazyString {
public:
LazyStringImpl( const T &t ) : t_( t ) {}
- virtual string val() const { return (string)t_; }
+ virtual string val() const { return t_.toString(); }
private:
const T& t_;
};
+ class Tee {
+ public:
+ virtual ~Tee(){}
+ virtual void write(LogLevel level , const string& str) = 0;
+ };
+
class Nullstream {
public:
+ virtual Nullstream& operator<< (Tee* tee) {
+ return *this;
+ }
virtual ~Nullstream() {}
virtual Nullstream& operator<<(const char *) {
return *this;
}
+ virtual Nullstream& operator<<(const string& ) {
+ return *this;
+ }
virtual Nullstream& operator<<(char *) {
return *this;
}
@@ -111,50 +144,118 @@ namespace mongo {
virtual Nullstream& operator<< (ios_base& (*hex)(ios_base&)) {
return *this;
}
- virtual void flush(){}
+ virtual void flush(Tee *t = 0) {}
};
extern Nullstream nullstream;
-#define LOGIT { ss << x; return *this; }
-
class Logstream : public Nullstream {
static mongo::mutex mutex;
static int doneSetup;
stringstream ss;
+ LogLevel logLevel;
+ static FILE* logfile;
+ static boost::scoped_ptr<ostream> stream;
+ static vector<Tee*> * globalTees;
public:
+
+ static void logLockless( const StringData& s ){
+ if ( doneSetup == 1717 ){
+ fwrite( s.data() , s.size() , 1 , logfile );
+ fflush( logfile );
+ }
+ else {
+ cout << s.data() << endl;
+ }
+ }
+
+ static void setLogFile(FILE* f){
+ scoped_lock lk(mutex);
+ logfile = f;
+ }
+
static int magicNumber(){
return 1717;
}
- void flush() {
+
+ void flush(Tee *t = 0) {
// this ensures things are sane
- if ( doneSetup == 1717 ){
+ if ( doneSetup == 1717 ) {
+ string msg = ss.str();
+ string threadName = getThreadName();
+ const char * type = logLevelToString(logLevel);
+
+ int spaceNeeded = msg.size() + 64 + threadName.size();
+ int bufSize = 128;
+ while ( bufSize < spaceNeeded )
+ bufSize += 128;
+
+ BufBuilder b(bufSize);
+ time_t_to_String( time(0) , b.grow(20) );
+ if (!threadName.empty()){
+ b.appendChar( '[' );
+ b.appendStr( threadName , false );
+ b.appendChar( ']' );
+ b.appendChar( ' ' );
+ }
+ if ( type[0] ){
+ b.appendStr( type , false );
+ b.appendStr( ": " , false );
+ }
+ b.appendStr( msg );
+
+ string out( b.buf() , b.len() - 1);
+
scoped_lock lk(mutex);
- cout << ss.str();
- cout.flush();
+
+ if( t ) t->write(logLevel,out);
+ if ( globalTees ){
+ for ( unsigned i=0; i<globalTees->size(); i++ )
+ (*globalTees)[i]->write(logLevel,out);
+ }
+
+#ifndef _WIN32
+ //syslog( LOG_INFO , "%s" , cc );
+#endif
+ fwrite(out.data(), out.size(), 1, logfile);
+ fflush(logfile);
}
- ss.str("");
+ _init();
}
- Logstream& operator<<(const char *x) LOGIT
- Logstream& operator<<(char *x) LOGIT
- Logstream& operator<<(char x) LOGIT
- Logstream& operator<<(int x) LOGIT
- Logstream& operator<<(ExitCode x) LOGIT
- Logstream& operator<<(long x) LOGIT
- Logstream& operator<<(unsigned long x) LOGIT
- Logstream& operator<<(unsigned x) LOGIT
- Logstream& operator<<(double x) LOGIT
- Logstream& operator<<(void *x) LOGIT
- Logstream& operator<<(const void *x) LOGIT
- Logstream& operator<<(long long x) LOGIT
- Logstream& operator<<(unsigned long long x) LOGIT
- Logstream& operator<<(bool x) LOGIT
+
+ Nullstream& setLogLevel(LogLevel l){
+ logLevel = l;
+ return *this;
+ }
+
+ /** note these are virtual */
+ Logstream& operator<<(const char *x) { ss << x; return *this; }
+ Logstream& operator<<(const string& x) { ss << x; return *this; }
+ Logstream& operator<<(char *x) { ss << x; return *this; }
+ Logstream& operator<<(char x) { ss << x; return *this; }
+ Logstream& operator<<(int x) { ss << x; return *this; }
+ Logstream& operator<<(ExitCode x) { ss << x; return *this; }
+ Logstream& operator<<(long x) { ss << x; return *this; }
+ Logstream& operator<<(unsigned long x) { ss << x; return *this; }
+ Logstream& operator<<(unsigned x) { ss << x; return *this; }
+ Logstream& operator<<(double x) { ss << x; return *this; }
+ Logstream& operator<<(void *x) { ss << x; return *this; }
+ Logstream& operator<<(const void *x) { ss << x; return *this; }
+ Logstream& operator<<(long long x) { ss << x; return *this; }
+ Logstream& operator<<(unsigned long long x) { ss << x; return *this; }
+ Logstream& operator<<(bool x) { ss << x; return *this; }
+
Logstream& operator<<(const LazyString& x) {
ss << x.val();
return *this;
}
+ Nullstream& operator<< (Tee* tee) {
+ ss << '\n';
+ flush(tee);
+ return *this;
+ }
Logstream& operator<< (ostream& ( *_endl )(ostream&)) {
ss << '\n';
- flush();
+ flush(0);
return *this;
}
Logstream& operator<< (ios_base& (*_hex)(ios_base&)) {
@@ -168,20 +269,29 @@ namespace mongo {
if ( ! t )
*this << "null";
else
- *this << t;
+ *this << *t;
return *this;
}
Logstream& prolog() {
- char now[64];
- time_t_to_String(time(0), now);
- now[20] = 0;
- ss << now;
return *this;
}
+
+ void addGlobalTee( Tee * t ){
+ if ( ! globalTees )
+ globalTees = new vector<Tee*>();
+ globalTees->push_back( t );
+ }
private:
static thread_specific_ptr<Logstream> tsp;
+ Logstream(){
+ _init();
+ }
+ void _init(){
+ ss.str("");
+ logLevel = LL_INFO;
+ }
public:
static Logstream& get() {
Logstream *p = tsp.get();
@@ -192,6 +302,7 @@ namespace mongo {
};
extern int logLevel;
+ extern int tlogLevel;
inline Nullstream& out( int level = 0 ) {
if ( level > logLevel )
@@ -203,7 +314,7 @@ namespace mongo {
at the specified level or higher. */
inline void logflush(int level = 0) {
if( level > logLevel )
- Logstream::get().flush();
+ Logstream::get().flush(0);
}
/* without prolog */
@@ -213,12 +324,29 @@ namespace mongo {
return Logstream::get();
}
- inline Nullstream& log( int level = 0 ) {
+ /** logging which we may not want during unit tests runs.
+ set tlogLevel to -1 to suppress tlog() output in a test program. */
+ inline Nullstream& tlog( int level = 0 ) {
+ if ( level > tlogLevel || level > logLevel )
+ return nullstream;
+ return Logstream::get().prolog();
+ }
+
+ inline Nullstream& log( int level ) {
if ( level > logLevel )
return nullstream;
return Logstream::get().prolog();
}
+ inline Nullstream& log( LogLevel l ) {
+ return Logstream::get().prolog().setLogLevel( l );
+ }
+
+
+ inline Nullstream& log() {
+ return Logstream::get().prolog();
+ }
+
/* TODOCONCURRENCY */
inline ostream& stdcout() {
return cout;
@@ -242,9 +370,52 @@ namespace mongo {
void initLogging( const string& logpath , bool append );
void rotateLogs( int signal = 0 );
-#define OUTPUT_ERRNOX(x) "errno:" << x << " " << strerror(x)
-#define OUTPUT_ERRNO OUTPUT_ERRNOX(errno)
+ std::string toUtf8String(const std::wstring& wide);
+
+ inline string errnoWithDescription(int x = errno) {
+ stringstream s;
+ s << "errno:" << x << ' ';
+
+#if defined(_WIN32)
+ LPTSTR errorText = NULL;
+ FormatMessage(
+ FORMAT_MESSAGE_FROM_SYSTEM
+ |FORMAT_MESSAGE_ALLOCATE_BUFFER
+ |FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL,
+ x, 0,
+ (LPTSTR) &errorText, // output
+ 0, // minimum size for output buffer
+ NULL);
+ if( errorText ) {
+ string x = toUtf8String(errorText);
+ for( string::iterator i = x.begin(); i != x.end(); i++ ) {
+ if( *i == '\n' || *i == '\r' )
+ break;
+ s << *i;
+ }
+ LocalFree(errorText);
+ }
+ else
+ s << strerror(x);
+ /*
+ DWORD n = FormatMessage(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER |
+ FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, x,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR) &lpMsgBuf, 0, NULL);
+ */
+#else
+ s << strerror(x);
+#endif
+ return s.str();
+ }
- string errnostring( const char * prefix = 0 );
+ /** output the error # and error message with prefix.
+ handy for use as parm in uassert/massert.
+ */
+ string errnoWithPrefix( const char * prefix = 0 );
} // namespace mongo
diff --git a/util/lruishmap.h b/util/lruishmap.h
index c390cb2..fe8b1dc 100644
--- a/util/lruishmap.h
+++ b/util/lruishmap.h
@@ -17,7 +17,7 @@
#pragma once
-#include "../stdafx.h"
+#include "../pch.h"
#include "../util/goodies.h"
namespace mongo {
diff --git a/util/md5main.cpp b/util/md5main.cpp
index d5b4982..9c56f91 100644
--- a/util/md5main.cpp
+++ b/util/md5main.cpp
@@ -40,7 +40,7 @@
2002-04-13 lpd Splits off main program into a separate file, md5main.c.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "md5.h"
#include <math.h>
#include <stdio.h>
diff --git a/util/message.cpp b/util/message.cpp
index 2c3d006..d7c13dc 100644
--- a/util/message.cpp
+++ b/util/message.cpp
@@ -18,7 +18,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "message.h"
#include <time.h>
#include "../util/goodies.h"
@@ -26,10 +26,25 @@
#include <fcntl.h>
#include <errno.h>
#include "../db/cmdline.h"
+#include "../client/dbclient.h"
+
+#ifndef _WIN32
+#include <sys/resource.h>
+#else
+
+// errno doesn't work for winsock.
+#undef errno
+#define errno WSAGetLastError()
+
+#endif
namespace mongo {
+ bool noUnixSocket = false;
+
bool objcheck = false;
+
+ void checkTicketNumbers();
// if you want trace output:
#define mmm(x)
@@ -42,54 +57,165 @@ namespace mongo {
const int portRecvFlags = 0;
#endif
- /* listener ------------------------------------------------------------------- */
+ const Listener* Listener::_timeTracker;
- bool Listener::init() {
- SockAddr me;
- if ( ip.empty() )
- me = SockAddr( port );
- else
- me = SockAddr( ip.c_str(), port );
- sock = ::socket(AF_INET, SOCK_STREAM, 0);
- if ( sock == INVALID_SOCKET ) {
- log() << "ERROR: listen(): invalid socket? " << OUTPUT_ERRNO << endl;
- return false;
- }
- prebindOptions( sock );
- if ( ::bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) {
- log() << "listen(): bind() failed " << OUTPUT_ERRNO << " for port: " << port << endl;
- closesocket(sock);
- return false;
+ vector<SockAddr> ipToAddrs(const char* ips, int port){
+ vector<SockAddr> out;
+ if (*ips == '\0'){
+ out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all
+
+ if (IPv6Enabled())
+ out.push_back(SockAddr("::", port)); // IPv6 all
+#ifndef _WIN32
+ if (!noUnixSocket)
+ out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket
+#endif
+ return out;
}
- if ( ::listen(sock, 128) != 0 ) {
- log() << "listen(): listen() failed " << OUTPUT_ERRNO << endl;
- closesocket(sock);
- return false;
+ while(*ips){
+ string ip;
+ const char * comma = strchr(ips, ',');
+ if (comma){
+ ip = string(ips, comma - ips);
+ ips = comma + 1;
+ }else{
+ ip = string(ips);
+ ips = "";
+ }
+
+ SockAddr sa(ip.c_str(), port);
+ out.push_back(sa);
+
+#ifndef _WIN32
+ if (!noUnixSocket && (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4
+ out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port));
+#endif
}
-
- return true;
+ return out;
+
}
- void Listener::listen() {
+ /* listener ------------------------------------------------------------------- */
+
+ void Listener::initAndListen() {
+ checkTicketNumbers();
+ vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port);
+ vector<int> socks;
+ SOCKET maxfd = 0; // needed for select()
+
+ for (vector<SockAddr>::iterator it=mine.begin(), end=mine.end(); it != end; ++it){
+ SockAddr& me = *it;
+
+ SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0);
+ if ( sock == INVALID_SOCKET ) {
+ log() << "ERROR: listen(): invalid socket? " << errnoWithDescription() << endl;
+ }
+
+ if (me.getType() == AF_UNIX){
+#if !defined(_WIN32)
+ if (unlink(me.getAddr().c_str()) == -1){
+ int x = errno;
+ if (x != ENOENT){
+ log() << "couldn't unlink socket file " << me << errnoWithDescription(x) << " skipping" << endl;
+ continue;
+ }
+ }
+#endif
+ } else if (me.getType() == AF_INET6) {
+ // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1)
+ // That causes a conflict if we don't do set it to IPV6_ONLY
+ const int one = 1;
+ setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &one, sizeof(one));
+ }
+
+ prebindOptions( sock );
+
+ if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) {
+ int x = errno;
+ log() << "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl;
+ if ( x == EADDRINUSE )
+ log() << " addr already in use" << endl;
+ closesocket(sock);
+ return;
+ }
+
+ if ( ::listen(sock, 128) != 0 ) {
+ log() << "listen(): listen() failed " << errnoWithDescription() << endl;
+ closesocket(sock);
+ return;
+ }
+
+ ListeningSockets::get()->add( sock );
+
+ socks.push_back(sock);
+ if (sock > maxfd)
+ maxfd = sock;
+ }
+
static long connNumber = 0;
- SockAddr from;
+ struct timeval maxSelectTime;
while ( ! inShutdown() ) {
- int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
- if ( s < 0 ) {
- if ( errno == ECONNABORTED || errno == EBADF ) {
- log() << "Listener on port " << port << " aborted" << endl;
- return;
- }
- log() << "Listener: accept() returns " << s << " " << OUTPUT_ERRNO << endl;
+ fd_set fds[1];
+ FD_ZERO(fds);
+
+ for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it){
+ FD_SET(*it, fds);
+ }
+
+ maxSelectTime.tv_sec = 0;
+ maxSelectTime.tv_usec = 10000;
+ const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime);
+
+ if (ret == 0){
+ _elapsedTime += ( 10000 - maxSelectTime.tv_usec ) / 1000;
continue;
}
- disableNagle(s);
- if ( ! cmdLine.quiet ) log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl;
- accepted( new MessagingPort(s, from) );
+ _elapsedTime += ret; // assume 1ms to grab connection. very rough
+
+ if (ret < 0){
+ int x = errno;
+#ifdef EINTR
+ if ( x == EINTR ){
+ log() << "select() signal caught, continuing" << endl;
+ continue;
+ }
+#endif
+ if ( ! inShutdown() )
+ log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl;
+ return;
+ }
+
+ for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it){
+ if (! (FD_ISSET(*it, fds)))
+ continue;
+
+ SockAddr from;
+ int s = accept(*it, from.raw(), &from.addressSize);
+ if ( s < 0 ) {
+ int x = errno; // so no global issues
+ if ( x == ECONNABORTED || x == EBADF ) {
+ log() << "Listener on port " << _port << " aborted" << endl;
+ return;
+ } if ( x == 0 && inShutdown() ){
+ return; // socket closed
+ }
+ log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl;
+ continue;
+ }
+ if (from.getType() != AF_UNIX)
+ disableNagle(s);
+ if ( _logConnect && ! cmdLine.quiet )
+ log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl;
+ accepted(s, from);
+ }
}
}
+ void Listener::accepted(int sock, const SockAddr& from){
+ accepted( new MessagingPort(sock, from) );
+ }
+
/* messagingport -------------------------------------------------------------- */
class PiggyBackData {
@@ -101,27 +227,28 @@ namespace mongo {
}
~PiggyBackData() {
- flush();
- delete( _cur );
+ DESTRUCTOR_GUARD (
+ flush();
+ delete[]( _cur );
+ );
}
void append( Message& m ) {
- assert( m.data->len <= 1300 );
+ assert( m.header()->len <= 1300 );
- if ( len() + m.data->len > 1300 )
+ if ( len() + m.header()->len > 1300 )
flush();
- memcpy( _cur , m.data , m.data->len );
- _cur += m.data->len;
+ memcpy( _cur , m.singleData() , m.header()->len );
+ _cur += m.header()->len;
}
- int flush() {
+ void flush() {
if ( _buf == _cur )
- return 0;
+ return;
- int x = _port->send( _buf , len() );
+ _port->send( _buf , len(), "flush" );
_cur = _buf;
- return x;
}
int len() {
@@ -137,12 +264,10 @@ namespace mongo {
};
class Ports {
- set<MessagingPort*>& ports;
+ set<MessagingPort*> ports;
mongo::mutex m;
public:
- // we "new" this so it is still be around when other automatic global vars
- // are being destructed during termination.
- Ports() : ports( *(new set<MessagingPort*>()) ) {}
+ Ports() : ports(), m("Ports") {}
void closeAll() { \
scoped_lock bl(m);
for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ )
@@ -156,7 +281,11 @@ namespace mongo {
scoped_lock bl(m);
ports.erase(p);
}
- } ports;
+ };
+
+ // we "new" this so it is still be around when other automatic global vars
+ // are being destructed during termination.
+ Ports& ports = *(new Ports());
@@ -164,14 +293,17 @@ namespace mongo {
ports.closeAll();
}
- MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far) {
+ MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far), _timeout() {
+ _logLevel = 0;
ports.insert(this);
}
- MessagingPort::MessagingPort() {
+ MessagingPort::MessagingPort( int timeout, int ll ) {
+ _logLevel = ll;
ports.insert(this);
sock = -1;
piggyBackData = 0;
+ _timeout = timeout;
}
void MessagingPort::shutdown() {
@@ -194,42 +326,30 @@ namespace mongo {
int res;
SockAddr farEnd;
void run() {
- res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize);
+ res = ::connect(sock, farEnd.raw(), farEnd.addressSize);
}
+ string name() { return "ConnectBG"; }
};
bool MessagingPort::connect(SockAddr& _far)
{
farEnd = _far;
- sock = socket(AF_INET, SOCK_STREAM, 0);
+ sock = socket(farEnd.getType(), SOCK_STREAM, 0);
if ( sock == INVALID_SOCKET ) {
- log() << "ERROR: connect(): invalid socket? " << OUTPUT_ERRNO << endl;
+ log(_logLevel) << "ERROR: connect(): invalid socket? " << errnoWithDescription() << endl;
return false;
}
-#if 0
- long fl = fcntl(sock, F_GETFL, 0);
- assert( fl >= 0 );
- fl |= O_NONBLOCK;
- fcntl(sock, F_SETFL, fl);
-
- int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize);
- if ( res ) {
- if ( errno == EINPROGRESS )
- closesocket(sock);
- sock = -1;
- return false;
+ if ( _timeout > 0 ) {
+ setSockTimeouts( sock, _timeout );
}
-
-#endif
-
+
ConnectBG bg;
bg.sock = sock;
bg.farEnd = farEnd;
bg.go();
- // int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize);
if ( bg.wait(5000) ) {
if ( bg.res ) {
closesocket(sock);
@@ -245,7 +365,8 @@ namespace mongo {
return false;
}
- disableNagle(sock);
+ if (farEnd.getType() != AF_UNIX)
+ disableNagle(sock);
#ifdef SO_NOSIGPIPE
// osx
@@ -257,94 +378,65 @@ namespace mongo {
}
bool MessagingPort::recv(Message& m) {
-again:
- mmm( out() << "* recv() sock:" << this->sock << endl; )
- int len = -1;
-
- char *lenbuf = (char *) &len;
- int lft = 4;
- while ( 1 ) {
- int x = recv( lenbuf, lft );
- if ( x == 0 ) {
- DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl;
- m.reset();
- return false;
- }
- if ( x < 0 ) {
- log() << "MessagingPort recv() " << OUTPUT_ERRNO << " " << farEnd.toString()<<endl;
- m.reset();
- return false;
- }
- lft -= x;
- if ( lft == 0 )
- break;
- lenbuf += x;
- log() << "MessagingPort recv() got " << x << " bytes wanted 4, lft=" << lft << endl;
- assert( lft > 0 );
- }
-
- if ( len < 0 || len > 16000000 ) {
- if ( len == -1 ) {
- // Endian check from the database, after connecting, to see what mode server is running in.
- unsigned foo = 0x10203040;
- int x = send( (char *) &foo, 4 );
- if ( x <= 0 ) {
- log() << "MessagingPort endian send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
+ try {
+ again:
+ mmm( log() << "* recv() sock:" << this->sock << endl; )
+ int len = -1;
+
+ char *lenbuf = (char *) &len;
+ int lft = 4;
+ recv( lenbuf, lft );
+
+ if ( len < 16 || len > 16000000 ) { // messages must be large enough for headers
+ if ( len == -1 ) {
+ // Endian check from the database, after connecting, to see what mode server is running in.
+ unsigned foo = 0x10203040;
+ send( (char *) &foo, 4, "endian" );
+ goto again;
+ }
+
+ if ( len == 542393671 ){
+ // an http GET
+ log(_logLevel) << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl;
+ string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n";
+ stringstream ss;
+ ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
+ string s = ss.str();
+ send( s.c_str(), s.size(), "http" );
return false;
}
- goto again;
- }
-
- if ( len == 542393671 ){
- // an http GET
- log() << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl;
- string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n";
- stringstream ss;
- ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
- string s = ss.str();
- send( s.c_str(), s.size() );
- return false;
- }
- log() << "bad recv() len: " << len << '\n';
- return false;
- }
-
- int z = (len+1023)&0xfffffc00;
- assert(z>=len);
- MsgData *md = (MsgData *) malloc(z);
- md->len = len;
-
- if ( len <= 0 ) {
- out() << "got a length of " << len << ", something is wrong" << endl;
- return false;
- }
-
- char *p = (char *) &md->id;
- int left = len -4;
- while ( 1 ) {
- int x = recv( p, left );
- if ( x == 0 ) {
- DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
- m.reset();
+ log(_logLevel) << "bad recv() len: " << len << '\n';
return false;
}
- if ( x < 0 ) {
- log() << "MessagingPort recv() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
- m.reset();
- return false;
+
+ int z = (len+1023)&0xfffffc00;
+ assert(z>=len);
+ MsgData *md = (MsgData *) malloc(z);
+ assert(md);
+ md->len = len;
+
+ char *p = (char *) &md->id;
+ int left = len -4;
+
+ try {
+ recv( p, left );
+ } catch (...) {
+ free(md);
+ throw;
}
- left -= x;
- p += x;
- if ( left <= 0 )
- break;
+
+ m.setData(md, true);
+ return true;
+
+ } catch ( const SocketException & e ) {
+ log(_logLevel + (e.shouldPrint() ? 0 : 1) ) << "SocketException: " << e << endl;
+ m.reset();
+ return false;
}
-
- m.setData(md, true);
- return true;
}
-
+
void MessagingPort::reply(Message& received, Message& response) {
- say(/*received.from, */response, received.data->id);
+ say(/*received.from, */response, received.header()->id);
}
void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) {
@@ -352,79 +444,171 @@ again:
}
bool MessagingPort::call(Message& toSend, Message& response) {
- mmm( out() << "*call()" << endl; )
- MSGID old = toSend.data->id;
+ mmm( log() << "*call()" << endl; )
+ MSGID old = toSend.header()->id;
say(/*to,*/ toSend);
while ( 1 ) {
bool ok = recv(response);
if ( !ok )
return false;
- //out() << "got response: " << response.data->responseTo << endl;
- if ( response.data->responseTo == toSend.data->id )
+ //log() << "got response: " << response.data->responseTo << endl;
+ if ( response.header()->responseTo == toSend.header()->id )
break;
- out() << "********************" << endl;
- out() << "ERROR: MessagingPort::call() wrong id got:" << (unsigned)response.data->responseTo << " expect:" << (unsigned)toSend.data->id << endl;
- out() << " toSend op: " << toSend.data->operation() << " old id:" << (unsigned)old << endl;
- out() << " response msgid:" << (unsigned)response.data->id << endl;
- out() << " response len: " << (unsigned)response.data->len << endl;
- out() << " response op: " << response.data->operation() << endl;
- out() << " farEnd: " << farEnd << endl;
+ log() << "********************" << endl;
+ log() << "ERROR: MessagingPort::call() wrong id got:" << hex << (unsigned)response.header()->responseTo << " expect:" << (unsigned)toSend.header()->id << endl;
+ log() << " toSend op: " << toSend.operation() << " old id:" << (unsigned)old << endl;
+ log() << " response msgid:" << (unsigned)response.header()->id << endl;
+ log() << " response len: " << (unsigned)response.header()->len << endl;
+ log() << " response op: " << response.operation() << endl;
+ log() << " farEnd: " << farEnd << endl;
assert(false);
response.reset();
}
- mmm( out() << "*call() end" << endl; )
+ mmm( log() << "*call() end" << endl; )
return true;
}
void MessagingPort::say(Message& toSend, int responseTo) {
- assert( toSend.data );
- mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
- toSend.data->id = nextMessageId();
- toSend.data->responseTo = responseTo;
-
- int x = -100;
+ assert( !toSend.empty() );
+ mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
+ toSend.header()->id = nextMessageId();
+ toSend.header()->responseTo = responseTo;
if ( piggyBackData && piggyBackData->len() ) {
- mmm( out() << "* have piggy back" << endl; )
- if ( ( piggyBackData->len() + toSend.data->len ) > 1300 ) {
+ mmm( log() << "* have piggy back" << endl; )
+ if ( ( piggyBackData->len() + toSend.header()->len ) > 1300 ) {
// won't fit in a packet - so just send it off
piggyBackData->flush();
}
else {
piggyBackData->append( toSend );
- x = piggyBackData->flush();
+ piggyBackData->flush();
+ return;
}
}
- if ( x == -100 )
- x = send( (char*)toSend.data, toSend.data->len );
-
- if ( x <= 0 ) {
- log() << "MessagingPort say send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
- throw SocketException();
- }
-
+ toSend.send( *this, "say" );
}
- int MessagingPort::send( const char * data , const int len ){
- return ::send( sock , data , len , portSendFlags );
+ // sends all data or throws an exception
+ void MessagingPort::send( const char * data , int len, const char *context ) {
+ while( len > 0 ) {
+ int ret = ::send( sock , data , len , portSendFlags );
+ if ( ret == -1 ) {
+ if ( errno != EAGAIN || _timeout == 0 ) {
+ log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl;
+ throw SocketException( SocketException::SEND_ERROR );
+ } else {
+ if ( !serverAlive( farEnd.toString() ) ) {
+ log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl;
+ throw SocketException( SocketException::SEND_ERROR );
+ }
+ }
+ } else {
+ assert( ret <= len );
+ len -= ret;
+ data += ret;
+ }
+ }
}
- int MessagingPort::recv( char * buf , int max ){
- return ::recv( sock , buf , max , portRecvFlags );
+ // sends all data or throws an exception
+ void MessagingPort::send( const vector< pair< char *, int > > &data, const char *context ){
+#if defined(_WIN32)
+ // TODO use scatter/gather api
+ for( vector< pair< char *, int > >::const_iterator i = data.begin(); i != data.end(); ++i ) {
+ char * data = i->first;
+ int len = i->second;
+ send( data, len, context );
+ }
+#else
+ vector< struct iovec > d( data.size() );
+ int i = 0;
+ for( vector< pair< char *, int > >::const_iterator j = data.begin(); j != data.end(); ++j ) {
+ if ( j->second > 0 ) {
+ d[ i ].iov_base = j->first;
+ d[ i ].iov_len = j->second;
+ ++i;
+ }
+ }
+ struct msghdr meta;
+ memset( &meta, 0, sizeof( meta ) );
+ meta.msg_iov = &d[ 0 ];
+ meta.msg_iovlen = d.size();
+
+ while( meta.msg_iovlen > 0 ) {
+ int ret = ::sendmsg( sock , &meta , portSendFlags );
+ if ( ret == -1 ) {
+ if ( errno != EAGAIN || _timeout == 0 ) {
+ log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl;
+ throw SocketException( SocketException::SEND_ERROR );
+ } else {
+ if ( !serverAlive( farEnd.toString() ) ) {
+ log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl;
+ throw SocketException( SocketException::SEND_ERROR );
+ }
+ }
+ } else {
+ struct iovec *& i = meta.msg_iov;
+ while( ret > 0 ) {
+ if ( i->iov_len > unsigned( ret ) ) {
+ i->iov_len -= ret;
+ i->iov_base = (char*)(i->iov_base) + ret;
+ ret = 0;
+ } else {
+ ret -= i->iov_len;
+ ++i;
+ --(meta.msg_iovlen);
+ }
+ }
+ }
+ }
+#endif
}
+ void MessagingPort::recv( char * buf , int len ){
+ while( len > 0 ) {
+ int ret = ::recv( sock , buf , len , portRecvFlags );
+ if ( ret == 0 ) {
+ log(3) << "MessagingPort recv() conn closed? " << farEnd.toString() << endl;
+ throw SocketException( SocketException::CLOSED );
+ }
+ if ( ret == -1 ) {
+ int e = errno;
+ if ( e != EAGAIN || _timeout == 0 ) {
+ log(_logLevel) << "MessagingPort recv() " << errnoWithDescription(e) << " " << farEnd.toString() <<endl;
+ throw SocketException( SocketException::RECV_ERROR );
+ } else {
+ if ( !serverAlive( farEnd.toString() ) ) {
+ log(_logLevel) << "MessagingPort recv() remote dead " << farEnd.toString() << endl;
+ throw SocketException( SocketException::RECV_ERROR );
+ }
+ }
+ } else {
+ if ( len <= 4 && ret != len )
+ log(_logLevel) << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl;
+ assert( ret <= len );
+ len -= ret;
+ buf += ret;
+ }
+ }
+ }
+
+ int MessagingPort::unsafe_recv( char *buf, int max ) {
+ return ::recv( sock , buf , max , portRecvFlags );
+ }
+
void MessagingPort::piggyBack( Message& toSend , int responseTo ) {
- if ( toSend.data->len > 1300 ) {
+ if ( toSend.header()->len > 1300 ) {
// not worth saving because its almost an entire packet
say( toSend );
return;
}
// we're going to be storing this, so need to set it up
- toSend.data->id = nextMessageId();
- toSend.data->responseTo = responseTo;
+ toSend.header()->id = nextMessageId();
+ toSend.header()->responseTo = responseTo;
if ( ! piggyBackData )
piggyBackData = new PiggyBackData( this );
@@ -432,10 +616,15 @@ again:
piggyBackData->append( toSend );
}
- unsigned MessagingPort::remotePort(){
+ unsigned MessagingPort::remotePort() const {
return farEnd.getPort();
}
+ HostAndPort MessagingPort::remote() const {
+ return farEnd;
+ }
+
+
MSGID NextMsgId;
bool usingClientIds = 0;
ThreadLocalValue<int> clientId;
@@ -473,4 +662,62 @@ again:
return clientId.get();
}
+ int getMaxConnections(){
+#ifdef _WIN32
+ return 20000;
+#else
+ struct rlimit limit;
+ assert( getrlimit(RLIMIT_NOFILE,&limit) == 0 );
+
+ int max = (int)(limit.rlim_cur * .8);
+
+ log(1) << "fd limit"
+ << " hard:" << limit.rlim_max
+ << " soft:" << limit.rlim_cur
+ << " max conn: " << max
+ << endl;
+
+ if ( max > 20000 )
+ max = 20000;
+
+ return max;
+#endif
+ }
+
+ void checkTicketNumbers(){
+ connTicketHolder.resize( getMaxConnections() );
+ }
+
+ TicketHolder connTicketHolder(20000);
+
+ namespace {
+ map<string, bool> isSelfCache; // host, isSelf
+ }
+
+ bool HostAndPort::isSelf() const {
+ int p = _port == -1 ? CmdLine::DefaultDBPort : _port;
+
+ if( p != cmdLine.port ){
+ return false;
+ } else if (sameHostname(getHostName(), _host) || isLocalHost()) {
+ return true;
+ } else {
+ map<string, bool>::const_iterator it = isSelfCache.find(_host);
+ if (it != isSelfCache.end()){
+ return it->second;
+ }
+
+ SockAddr addr (_host.c_str(), 0); // port 0 is dynamically assigned
+ SOCKET sock = ::socket(addr.getType(), SOCK_STREAM, 0);
+ assert(sock != INVALID_SOCKET);
+
+ bool ret = (::bind(sock, addr.raw(), addr.addressSize) == 0);
+ isSelfCache[_host] = ret;
+
+ closesocket(sock);
+
+ return ret;
+ }
+ }
+
} // namespace mongo
diff --git a/util/message.h b/util/message.h
index 5dccaef..13f31df 100644
--- a/util/message.h
+++ b/util/message.h
@@ -1,4 +1,4 @@
-// message.h
+// Message.h
/* Copyright 2009 10gen Inc.
*
@@ -18,10 +18,13 @@
#pragma once
#include "../util/sock.h"
-#include "../util/atomic_int.h"
+#include "../bson/util/atomic_int.h"
+#include "hostandport.h"
namespace mongo {
+ extern bool noUnixSocket;
+
class Message;
class MessagingPort;
class PiggyBackData;
@@ -29,18 +32,46 @@ namespace mongo {
class Listener {
public:
- Listener(const string &_ip, int p) : ip(_ip), port(p) { }
- virtual ~Listener() {}
- bool init(); // set up socket
- int socket() const { return sock; }
- void listen(); // never returns (start a thread)
+ Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0){ }
+ virtual ~Listener() {
+ if ( _timeTracker == this )
+ _timeTracker = 0;
+ }
+ void initAndListen(); // never returns unless error (start a thread)
/* spawn a thread, etc., then return */
- virtual void accepted(MessagingPort *mp) = 0;
+ virtual void accepted(int sock, const SockAddr& from);
+ virtual void accepted(MessagingPort *mp){
+ assert(!"You must overwrite one of the accepted methods");
+ }
+
+ const int _port;
+
+ /**
+ * @return a rough estimate of elepased time since the server started
+ */
+ long long getMyElapsedTimeMillis() const { return _elapsedTime; }
+
+ void setAsTimeTracker(){
+ _timeTracker = this;
+ }
+
+ static const Listener* getTimeTracker(){
+ return _timeTracker;
+ }
+
+ static long long getElapsedTimeMillis() {
+ if ( _timeTracker )
+ return _timeTracker->getMyElapsedTimeMillis();
+ return 0;
+ }
+
private:
- string ip;
- int port;
- int sock;
+ string _ip;
+ bool _logConnect;
+ long long _elapsedTime;
+
+ static const Listener* _timeTracker;
};
class AbstractMessagingPort {
@@ -49,13 +80,25 @@ namespace mongo {
virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
virtual void reply(Message& received, Message& response) = 0;
- virtual unsigned remotePort() = 0 ;
+ virtual HostAndPort remote() const = 0;
+ virtual unsigned remotePort() const = 0;
+
+ virtual int getClientId(){
+ int x = remotePort();
+ x = x << 16;
+ return x;
+ }
};
class MessagingPort : public AbstractMessagingPort {
public:
- MessagingPort(int sock, SockAddr& farEnd);
- MessagingPort();
+ MessagingPort(int sock, const SockAddr& farEnd);
+
+ // in some cases the timeout will actually be 2x this value - eg we do a partial send,
+ // then the timeout fires, then we try to send again, then the timeout fires again with
+ // no data sent, then we detect that the other side is down
+ MessagingPort(int timeout = 0, int logLevel = 0 );
+
virtual ~MessagingPort();
void shutdown();
@@ -73,22 +116,28 @@ namespace mongo {
void piggyBack( Message& toSend , int responseTo = -1 );
- virtual unsigned remotePort();
+ virtual unsigned remotePort() const;
+ virtual HostAndPort remote() const;
+
+ // send len or throw SocketException
+ void send( const char * data , int len, const char *context );
+ void send( const vector< pair< char *, int > > &data, const char *context );
- int send( const char * data , const int len );
- int recv( char * data , int max );
+ // recv len or throw SocketException
+ void recv( char * data , int len );
+
+ int unsafe_recv( char *buf, int max );
private:
int sock;
PiggyBackData * piggyBackData;
public:
SockAddr farEnd;
+ int _timeout;
+ int _logLevel; // passed to log() when logging errors
friend class PiggyBackData;
};
- //#pragma pack()
-#pragma pack(1)
-
enum Operations {
opReply = 1, /* reply. responseTo is set. */
dbMsg = 1000, /* generic msg command followed by a string */
@@ -121,6 +170,27 @@ namespace mongo {
}
}
+#pragma pack(1)
+/* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol
+*/
+struct MSGHEADER {
+ int messageLength; // total message size, including this
+ int requestID; // identifier for this message
+ int responseTo; // requestID from the original request
+ // (used in reponses from db)
+ int opCode;
+};
+struct OP_GETMORE : public MSGHEADER {
+ MSGHEADER header; // standard message header
+ int ZERO_or_flags; // 0 - reserved for future use
+ //cstring fullCollectionName; // "dbname.collectionname"
+ //int32 numberToReturn; // number of documents to return
+ //int64 cursorID; // cursorID from the OP_REPLY
+};
+#pragma pack()
+
+#pragma pack(1)
+ /* todo merge this with MSGHEADER (or inherit from it). */
struct MsgData {
int len; /* len of the msg, including this field */
MSGID id; /* request/reply id's match... */
@@ -146,88 +216,232 @@ namespace mongo {
return true;
}
+ long long getCursor(){
+ assert( responseTo > 0 );
+ assert( _operation == opReply );
+ long long * l = (long long *)(_data + 4);
+ return l[0];
+ }
+
int dataLen(); // len without header
};
const int MsgDataHeaderSize = sizeof(MsgData) - 4;
inline int MsgData::dataLen() {
return len - MsgDataHeaderSize;
}
-
#pragma pack()
class Message {
public:
- Message() {
- data = 0;
- freeIt = false;
- }
- Message( void * _data , bool _freeIt ) {
- data = (MsgData*)_data;
- freeIt = _freeIt;
+ // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit).
+ Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {}
+ Message( void * data , bool freeIt ) :
+ _buf( 0 ), _data( 0 ), _freeIt( false ) {
+ _setData( reinterpret_cast< MsgData* >( data ), freeIt );
};
+ Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) {
+ *this = r;
+ }
~Message() {
reset();
}
- SockAddr from;
- MsgData *data;
+ SockAddr _from;
- int operation() const {
- return data->operation();
+ MsgData *header() const {
+ assert( !empty() );
+ return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first );
+ }
+ int operation() const { return header()->operation(); }
+
+ MsgData *singleData() const {
+ massert( 13273, "single data buffer expected", _buf );
+ return header();
}
+ bool empty() const { return !_buf && _data.empty(); }
+
+ int size() const{
+ int res = 0;
+ if ( _buf ){
+ res = _buf->len;
+ } else {
+ for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it){
+ res += it->second;
+ }
+ }
+ return res;
+ }
+
+ // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
+ // can get rid of this if we make response handling smarter
+ void concat() {
+ if ( _buf || empty() ) {
+ return;
+ }
+
+ assert( _freeIt );
+ int totalSize = 0;
+ for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
+ totalSize += i->second;
+ }
+ char *buf = (char*)malloc( totalSize );
+ char *p = buf;
+ for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
+ memcpy( p, i->first, i->second );
+ p += i->second;
+ }
+ reset();
+ _setData( (MsgData*)buf, true );
+ }
+
+ // vector swap() so this is fast
Message& operator=(Message& r) {
- assert( data == 0 );
- data = r.data;
- assert( r.freeIt );
- r.freeIt = false;
- r.data = 0;
- freeIt = true;
+ assert( empty() );
+ assert( r._freeIt );
+ _buf = r._buf;
+ r._buf = 0;
+ if ( r._data.size() > 0 ) {
+ _data.swap( r._data );
+ }
+ r._freeIt = false;
+ _freeIt = true;
return *this;
}
void reset() {
- if ( freeIt && data )
- free(data);
- data = 0;
- freeIt = false;
+ if ( _freeIt ) {
+ if ( _buf ) {
+ free( _buf );
+ }
+ for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) {
+ free(i->first);
+ }
+ }
+ _buf = 0;
+ _data.clear();
+ _freeIt = false;
}
- void setData(MsgData *d, bool _freeIt) {
- assert( data == 0 );
- freeIt = _freeIt;
- data = d;
+ // use to add a buffer
+ // assumes message will free everything
+ void appendData(char *d, int size) {
+ if ( size <= 0 ) {
+ return;
+ }
+ if ( empty() ) {
+ MsgData *md = (MsgData*)d;
+ md->len = size; // can be updated later if more buffers added
+ _setData( md, true );
+ return;
+ }
+ assert( _freeIt );
+ if ( _buf ) {
+ _data.push_back( make_pair( (char*)_buf, _buf->len ) );
+ _buf = 0;
+ }
+ _data.push_back( make_pair( d, size ) );
+ header()->len += size;
+ }
+
+ // use to set first buffer if empty
+ void setData(MsgData *d, bool freeIt) {
+ assert( empty() );
+ _setData( d, freeIt );
}
void setData(int operation, const char *msgtxt) {
setData(operation, msgtxt, strlen(msgtxt)+1);
}
void setData(int operation, const char *msgdata, size_t len) {
- assert(data == 0);
+ assert( empty() );
size_t dataLen = len + sizeof(MsgData) - 4;
MsgData *d = (MsgData *) malloc(dataLen);
memcpy(d->_data, msgdata, len);
d->len = fixEndian(dataLen);
d->setOperation(operation);
- freeIt= true;
- data = d;
+ _setData( d, true );
}
bool doIFreeIt() {
- return freeIt;
+ return _freeIt;
+ }
+
+ void send( MessagingPort &p, const char *context ) {
+ if ( empty() ) {
+ return;
+ }
+ if ( _buf != 0 ) {
+ p.send( (char*)_buf, _buf->len, context );
+ } else {
+ p.send( _data, context );
+ }
}
private:
- bool freeIt;
+ void _setData( MsgData *d, bool freeIt ) {
+ _freeIt = freeIt;
+ _buf = d;
+ }
+ // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
+ MsgData * _buf;
+ // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead
+ typedef vector< pair< char*, int > > MsgVec;
+ MsgVec _data;
+ bool _freeIt;
};
class SocketException : public DBException {
public:
- virtual const char* what() const throw() { return "socket exception"; }
- virtual int getCode(){ return 9001; }
+ enum Type { CLOSED , RECV_ERROR , SEND_ERROR } type;
+ SocketException( Type t ) : DBException( "socket exception" , 9001 ) , type(t){}
+
+ bool shouldPrint() const {
+ return type != CLOSED;
+ }
+
};
MSGID nextMessageId();
void setClientId( int id );
int getClientId();
+
+ extern TicketHolder connTicketHolder;
+
+ class ElapsedTracker {
+ public:
+ ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks )
+ : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0){
+ _last = Listener::getElapsedTimeMillis();
+ }
+
+ /**
+ * call this for every iteration
+ * returns true if one of the triggers has gone off
+ */
+ bool ping(){
+ if ( ( ++_pings % _h ) == 0 ){
+ _last = Listener::getElapsedTimeMillis();
+ return true;
+ }
+
+ long long now = Listener::getElapsedTimeMillis();
+ if ( now - _last > _ms ){
+ _last = now;
+ return true;
+ }
+
+ return false;
+ }
+
+ private:
+ int _h;
+ int _ms;
+
+ unsigned long long _pings;
+
+ long long _last;
+
+ };
+
} // namespace mongo
diff --git a/util/message_server.h b/util/message_server.h
index cc40b76..9d6a8f2 100644
--- a/util/message_server.h
+++ b/util/message_server.h
@@ -22,7 +22,7 @@
#pragma once
-#include "../stdafx.h"
+#include "../pch.h"
namespace mongo {
@@ -30,20 +30,23 @@ namespace mongo {
public:
virtual ~MessageHandler(){}
virtual void process( Message& m , AbstractMessagingPort* p ) = 0;
+ virtual void disconnected( AbstractMessagingPort* p ) = 0;
};
-
+
class MessageServer {
public:
- MessageServer( int port , MessageHandler * handler ) : _port( port ) , _handler( handler ){}
- virtual ~MessageServer(){}
+ struct Options {
+ int port; // port to bind to
+ string ipList; // addresses to bind to
+ Options() : port(0), ipList(""){}
+ };
+
+ virtual ~MessageServer(){}
virtual void run() = 0;
-
- protected:
-
- int _port;
- MessageHandler* _handler;
+ virtual void setAsTimeTracker() = 0;
};
- MessageServer * createServer( int port , MessageHandler * handler );
+ // TODO use a factory here to decide between port and asio variations
+ MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler );
}
diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp
index 7fca29a..0c9479c 100644
--- a/util/message_server_asio.cpp
+++ b/util/message_server_asio.cpp
@@ -27,7 +27,7 @@
#include "message.h"
#include "message_server.h"
-#include "../util/mvar.h"
+#include "../util/concurrency/mvar.h"
using namespace boost;
using namespace boost::asio;
@@ -204,9 +204,11 @@ namespace mongo {
class AsyncMessageServer : public MessageServer {
public:
- AsyncMessageServer( int port , MessageHandler * handler )
- : MessageServer( port , handler )
- , _endpoint( tcp::v4() , port )
+ // TODO accept an IP address to bind to
+ AsyncMessageServer( const MessageServer::Options& opts , MessageHandler * handler )
+ : _port( opts.port )
+ , _handler(handler)
+ , _endpoint( tcp::v4() , opts.port )
, _acceptor( _ioservice , _endpoint )
{
_accept();
@@ -232,7 +234,7 @@ namespace mongo {
_accept();
}
- void _accept(){
+ void _accept( ){
shared_ptr<MessageServerSession> session( new MessageServerSession( _handler , _ioservice ) );
_acceptor.async_accept( session->socket() ,
boost::bind( &AsyncMessageServer::handleAccept,
@@ -243,13 +245,15 @@ namespace mongo {
}
private:
+ int _port;
+ MessageHandler * _handler;
io_service _ioservice;
tcp::endpoint _endpoint;
tcp::acceptor _acceptor;
};
- MessageServer * createServer( int port , MessageHandler * handler ){
- return new AsyncMessageServer( port , handler );
+ MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ){
+ return new AsyncMessageServer( opts , handler );
}
}
diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp
index 2350ec2..9649e45 100644
--- a/util/message_server_port.cpp
+++ b/util/message_server_port.cpp
@@ -15,32 +15,41 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#ifndef USE_ASIO
#include "message.h"
#include "message_server.h"
+#include "../db/cmdline.h"
+
namespace mongo {
namespace pms {
- MessagingPort * grab = 0;
MessageHandler * handler;
-
- void threadRun(){
- assert( grab );
- auto_ptr<MessagingPort> p( grab );
- grab = 0;
+
+ void threadRun( MessagingPort * inPort){
+ assert( inPort );
+ setThreadName( "conn" );
+ TicketHolderReleaser connTicketReleaser( &connTicketHolder );
+
+ auto_ptr<MessagingPort> p( inPort );
+
+ string otherSide;
+
Message m;
try {
+ otherSide = p->farEnd.toString();
+
while ( 1 ){
m.reset();
if ( ! p->recv(m) ) {
- log() << "end connection " << p->farEnd.toString() << endl;
+ if( !cmdLine.quiet )
+ log() << "end connection " << otherSide << endl;
p->shutdown();
break;
}
@@ -48,42 +57,69 @@ namespace mongo {
handler->process( m , p.get() );
}
}
+ catch ( const SocketException& ){
+ log() << "unclean socket shutdown from: " << otherSide << endl;
+ }
+ catch ( const std::exception& e ){
+ problem() << "uncaught exception (" << e.what() << ")(" << demangleName( typeid(e) ) <<") in PortMessageServer::threadRun, closing connection" << endl;
+ }
catch ( ... ){
problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl;
}
+ handler->disconnected( p.get() );
}
}
class PortMessageServer : public MessageServer , public Listener {
public:
- PortMessageServer( int port , MessageHandler * handler ) :
- MessageServer( port , handler ) ,
- Listener( "", port ){
+ PortMessageServer( const MessageServer::Options& opts, MessageHandler * handler ) :
+ Listener( opts.ipList, opts.port ){
uassert( 10275 , "multiple PortMessageServer not supported" , ! pms::handler );
pms::handler = handler;
}
virtual void accepted(MessagingPort * p) {
- assert( ! pms::grab );
- pms::grab = p;
- boost::thread thr( pms::threadRun );
- while ( pms::grab )
- sleepmillis(1);
+
+ if ( ! connTicketHolder.tryAcquire() ){
+ log() << "connection refused because too many open connections: " << connTicketHolder.used() << endl;
+
+ // TODO: would be nice if we notified them...
+ p->shutdown();
+ delete p;
+
+ sleepmillis(2); // otherwise we'll hard loop
+ return;
+ }
+
+ try {
+ boost::thread thr( boost::bind( &pms::threadRun , p ) );
+ }
+ catch ( boost::thread_resource_error& ){
+ log() << "can't create new thread, closing connection" << endl;
+
+ p->shutdown();
+ delete p;
+
+ sleepmillis(2);
+ }
}
+ virtual void setAsTimeTracker(){
+ Listener::setAsTimeTracker();
+ }
+
void run(){
- assert( init() );
- listen();
+ initAndListen();
}
};
- MessageServer * createServer( int port , MessageHandler * handler ){
- return new PortMessageServer( port , handler );
+ MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ){
+ return new PortMessageServer( opts , handler );
}
}
diff --git a/util/miniwebserver.cpp b/util/miniwebserver.cpp
index 61619d8..0193c5d 100644
--- a/util/miniwebserver.cpp
+++ b/util/miniwebserver.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "miniwebserver.h"
#include "hex.h"
@@ -23,52 +23,22 @@
namespace mongo {
- MiniWebServer::MiniWebServer() {
- sock = 0;
- }
-
- bool MiniWebServer::init(const string &ip, int _port) {
- port = _port;
- SockAddr me;
- if ( ip.empty() )
- me = SockAddr( port );
- else
- me = SockAddr( ip.c_str(), port );
- sock = ::socket(AF_INET, SOCK_STREAM, 0);
- if ( sock == INVALID_SOCKET ) {
- log() << "ERROR: MiniWebServer listen(): invalid socket? " << OUTPUT_ERRNO << endl;
- return false;
- }
- prebindOptions( sock );
- if ( ::bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) {
- log() << "MiniWebServer: bind() failed port:" << port << " " << OUTPUT_ERRNO << endl;
- if ( errno == EADDRINUSE )
- log() << " addr already in use" << endl;
- closesocket(sock);
- return false;
- }
-
- if ( ::listen(sock, 16) != 0 ) {
- log() << "MiniWebServer: listen() failed " << OUTPUT_ERRNO << endl;
- closesocket(sock);
- return false;
- }
-
- return true;
- }
+ MiniWebServer::MiniWebServer(const string &ip, int port)
+ : Listener(ip, port, false)
+ {}
string MiniWebServer::parseURL( const char * buf ) {
- const char * urlStart = strstr( buf , " " );
+ const char * urlStart = strchr( buf , ' ' );
if ( ! urlStart )
return "/";
urlStart++;
- const char * end = strstr( urlStart , " " );
+ const char * end = strchr( urlStart , ' ' );
if ( ! end ) {
- end = strstr( urlStart , "\r" );
+ end = strchr( urlStart , '\r' );
if ( ! end ) {
- end = strstr( urlStart , "\n" );
+ end = strchr( urlStart , '\n' );
}
}
@@ -105,14 +75,14 @@ namespace mongo {
if ( eq == string::npos )
continue;
- b.append( urlDecode(cur.substr(0,eq)).c_str() , urlDecode(cur.substr(eq+1) ) );
+ b.append( urlDecode(cur.substr(0,eq)) , urlDecode(cur.substr(eq+1) ) );
}
params = b.obj();
}
string MiniWebServer::parseMethod( const char * headers ) {
- const char * end = strstr( headers , " " );
+ const char * end = strchr( headers , ' ' );
if ( ! end )
return "GET";
return string( headers , (int)(end-headers) );
@@ -139,17 +109,23 @@ namespace mongo {
}
void MiniWebServer::accepted(int s, const SockAddr &from) {
+ setSockTimeouts(s, 8);
char buf[4096];
int len = 0;
while ( 1 ) {
- int x = ::recv(s, buf + len, sizeof(buf) - 1 - len, 0);
+ int left = sizeof(buf) - 1 - len;
+ if( left == 0 )
+ break;
+ int x = ::recv(s, buf + len, left, 0);
if ( x <= 0 ) {
+ closesocket(s);
return;
}
len += x;
buf[ len ] = 0;
- if ( fullReceive( buf ) )
+ if ( fullReceive( buf ) ) {
break;
+ }
}
buf[len] = 0;
@@ -178,18 +154,23 @@ namespace mongo {
ss << "Content-Type: text/html\r\n";
}
else {
- for ( vector<string>::iterator i = headers.begin(); i != headers.end(); i++ )
+ for ( vector<string>::iterator i = headers.begin(); i != headers.end(); i++ ) {
+ assert( strncmp("Content-Length", i->c_str(), 14) );
ss << *i << "\r\n";
+ }
}
+ ss << "Connection: close\r\n";
+ ss << "Content-Length: " << responseMsg.size() << "\r\n";
ss << "\r\n";
ss << responseMsg;
string response = ss.str();
::send(s, response.c_str(), response.size(), 0);
+ closesocket(s);
}
string MiniWebServer::getHeader( const char * req , string wanted ){
- const char * headers = strstr( req , "\n" );
+ const char * headers = strchr( req , '\n' );
if ( ! headers )
return "";
pcrecpp::StringPiece input( headers + 1 );
@@ -203,26 +184,6 @@ namespace mongo {
}
return "";
}
-
- void MiniWebServer::run() {
- SockAddr from;
- while ( ! inShutdown() ) {
- int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
- if ( s < 0 ) {
- if ( errno == ECONNABORTED ) {
- log() << "Listener on port " << port << " aborted." << endl;
- return;
- }
- log() << "MiniWebServer: accept() returns " << s << " " << OUTPUT_ERRNO << endl;
- sleepmillis(200);
- continue;
- }
- disableNagle(s);
- RARELY log() << "MiniWebServer: connection accepted from " << from.toString() << endl;
- accepted( s, from );
- closesocket(s);
- }
- }
string MiniWebServer::urlDecode(const char* s){
stringstream out;
diff --git a/util/miniwebserver.h b/util/miniwebserver.h
index bdd2873..bbd1ba2 100644
--- a/util/miniwebserver.h
+++ b/util/miniwebserver.h
@@ -17,20 +17,17 @@
#pragma once
-#include "../stdafx.h"
+#include "../pch.h"
#include "message.h"
#include "../db/jsobj.h"
namespace mongo {
- class MiniWebServer {
+ class MiniWebServer : public Listener {
public:
- MiniWebServer();
+ MiniWebServer(const string &ip, int _port);
virtual ~MiniWebServer() {}
- bool init(const string &ip, int _port);
- void run();
-
virtual void doRequest(
const char *rq, // the full request
string url,
@@ -41,13 +38,13 @@ namespace mongo {
const SockAddr &from
) = 0;
- int socket() const { return sock; }
+ // --- static helpers ----
+
+ static void parseParams( BSONObj & params , string query );
- protected:
- string parseURL( const char * buf );
- string parseMethod( const char * headers );
- string getHeader( const char * headers , string name );
- void parseParams( BSONObj & params , string query );
+ static string parseURL( const char * buf );
+ static string parseMethod( const char * headers );
+ static string getHeader( const char * headers , string name );
static const char *body( const char *buf );
static string urlDecode(const char* s);
@@ -56,9 +53,6 @@ namespace mongo {
private:
void accepted(int s, const SockAddr &from);
static bool fullReceive( const char *buf );
-
- int port;
- int sock;
};
} // namespace mongo
diff --git a/util/mmap.cpp b/util/mmap.cpp
index f6bbc73..b9c1994 100644
--- a/util/mmap.cpp
+++ b/util/mmap.cpp
@@ -15,36 +15,67 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "mmap.h"
#include "processinfo.h"
+#include "concurrency/rwlock.h"
namespace mongo {
- set<MemoryMappedFile*> mmfiles;
- mongo::mutex mmmutex;
+ /*static*/ void MemoryMappedFile::updateLength( const char *filename, long &length ) {
+ if ( !boost::filesystem::exists( filename ) )
+ return;
+ // make sure we map full length if preexisting file.
+ boost::uintmax_t l = boost::filesystem::file_size( filename );
+ assert( l <= 0x7fffffff );
+ length = (long) l;
+ }
- MemoryMappedFile::~MemoryMappedFile() {
- close();
- scoped_lock lk( mmmutex );
- mmfiles.erase(this);
+ void* MemoryMappedFile::map(const char *filename) {
+ boost::uintmax_t l = boost::filesystem::file_size( filename );
+ assert( l <= 0x7fffffff );
+ long i = (long)l;
+ return map( filename , i );
}
- void MemoryMappedFile::created(){
- scoped_lock lk( mmmutex );
- mmfiles.insert(this);
+ void printMemInfo( const char * where ){
+ cout << "mem info: ";
+ if ( where )
+ cout << where << " ";
+ ProcessInfo pi;
+ if ( ! pi.supported() ){
+ cout << " not supported" << endl;
+ return;
+ }
+
+ cout << "vsize: " << pi.getVirtualMemorySize() << " resident: " << pi.getResidentSize() << " mapped: " << ( MemoryMappedFile::totalMappedLength() / ( 1024 * 1024 ) ) << endl;
+ }
+
+ /* --- MongoFile -------------------------------------------------
+ this is the administrative stuff
+ */
+
+ static set<MongoFile*> mmfiles;
+ static RWLock mmmutex("rw:mmmutex");
+
+ void MongoFile::destroyed() {
+ rwlock lk( mmmutex , true );
+ mmfiles.erase(this);
}
/*static*/
- int closingAllFiles = 0;
- void MemoryMappedFile::closeAllFiles( stringstream &message ) {
+ void MongoFile::closeAllFiles( stringstream &message ) {
+ static int closingAllFiles = 0;
if ( closingAllFiles ) {
message << "warning closingAllFiles=" << closingAllFiles << endl;
return;
}
++closingAllFiles;
+
+ rwlock lk( mmmutex , true );
+
ProgressMeter pm( mmfiles.size() , 2 , 1 );
- for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
+ for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
(*i)->close();
pm.hit();
}
@@ -52,59 +83,78 @@ namespace mongo {
--closingAllFiles;
}
- long long MemoryMappedFile::totalMappedLength(){
+ /*static*/ long long MongoFile::totalMappedLength(){
unsigned long long total = 0;
- scoped_lock lk( mmmutex );
- for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ )
+ rwlock lk( mmmutex , false );
+ for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ )
total += (*i)->length();
return total;
}
- int MemoryMappedFile::flushAll( bool sync ){
- int num = 0;
-
- scoped_lock lk( mmmutex );
- for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
- num++;
- MemoryMappedFile * mmf = *i;
- if ( ! mmf )
- continue;
- mmf->flush( sync );
+ /*static*/ int MongoFile::flushAll( bool sync ){
+ if ( ! sync ){
+ int num = 0;
+ rwlock lk( mmmutex , false );
+ for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
+ num++;
+ MongoFile * mmf = *i;
+ if ( ! mmf )
+ continue;
+
+ mmf->flush( sync );
+ }
+ return num;
+ }
+
+ // want to do it sync
+ set<MongoFile*> seen;
+ while ( true ){
+ auto_ptr<Flushable> f;
+ {
+ rwlock lk( mmmutex , false );
+ for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
+ MongoFile * mmf = *i;
+ if ( ! mmf )
+ continue;
+ if ( seen.count( mmf ) )
+ continue;
+ f.reset( mmf->prepareFlush() );
+ seen.insert( mmf );
+ break;
+ }
+ }
+ if ( ! f.get() )
+ break;
+
+ f->flush();
}
- return num;
+ return seen.size();
}
-
- void MemoryMappedFile::updateLength( const char *filename, long &length ) {
- if ( !boost::filesystem::exists( filename ) )
- return;
- // make sure we map full length if preexisting file.
- boost::uintmax_t l = boost::filesystem::file_size( filename );
- assert( l <= 0x7fffffff );
- length = (long) l;
+ void MongoFile::created(){
+ rwlock lk( mmmutex , true );
+ mmfiles.insert(this);
}
- void* MemoryMappedFile::map(const char *filename) {
- boost::uintmax_t l = boost::filesystem::file_size( filename );
- assert( l <= 0x7fffffff );
- long i = (long)l;
- return map( filename , i );
- }
+#ifdef _DEBUG
- void printMemInfo( const char * where ){
- cout << "mem info: ";
- if ( where )
- cout << where << " ";
- ProcessInfo pi;
- if ( ! pi.supported() ){
- cout << " not supported" << endl;
- return;
+ void MongoFile::lockAll() {
+ rwlock lk( mmmutex , false );
+ for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
+ MongoFile * mmf = *i;
+ if (mmf) mmf->_lock();
}
-
- cout << "vsize: " << pi.getVirtualMemorySize() << " resident: " << pi.getResidentSize() << " mapped: " << ( MemoryMappedFile::totalMappedLength() / ( 1024 * 1024 ) ) << endl;
}
+ void MongoFile::unlockAll() {
+ rwlock lk( mmmutex , false );
+ for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){
+ MongoFile * mmf = *i;
+ if (mmf) mmf->_unlock();
+ }
+ }
+#endif
} // namespace mongo
diff --git a/util/mmap.h b/util/mmap.h
index c3133e4..f4875be 100644
--- a/util/mmap.h
+++ b/util/mmap.h
@@ -18,52 +18,182 @@
#pragma once
namespace mongo {
+
+ /* the administrative-ish stuff here */
+ class MongoFile : boost::noncopyable {
+
+ public:
+ /** Flushable has to fail nicely if the underlying object gets killed */
+ class Flushable {
+ public:
+ virtual ~Flushable(){}
+ virtual void flush() = 0;
+ };
+
+ protected:
+ virtual void close() = 0;
+ virtual void flush(bool sync) = 0;
+ /**
+ * returns a thread safe object that you can call flush on
+ * Flushable has to fail nicely if the underlying object gets killed
+ */
+ virtual Flushable * prepareFlush() = 0;
+
+ void created(); /* subclass must call after create */
+ void destroyed(); /* subclass must call in destructor */
+
+ // only supporting on posix mmap
+ virtual void _lock() {}
+ virtual void _unlock() {}
- class MemoryMappedFile {
public:
+ virtual ~MongoFile() {}
+ virtual long length() = 0;
enum Options {
- SEQUENTIAL = 1
+ SEQUENTIAL = 1 // hint - e.g. FILE_FLAG_SEQUENTIAL_SCAN on windows
+ };
+
+ static int flushAll( bool sync ); // returns n flushed
+ static long long totalMappedLength();
+ static void closeAllFiles( stringstream &message );
+
+ // Locking allows writes. Reads are always allowed
+ static void lockAll();
+ static void unlockAll();
+
+ /* can be "overriden" if necessary */
+ static bool exists(boost::filesystem::path p) {
+ return boost::filesystem::exists(p);
+ }
+ };
+
+#ifndef _DEBUG
+ // no-ops in production
+ inline void MongoFile::lockAll() {}
+ inline void MongoFile::unlockAll() {}
+
+#endif
+
+ struct MongoFileAllowWrites {
+ MongoFileAllowWrites(){
+ MongoFile::lockAll();
+ }
+ ~MongoFileAllowWrites(){
+ MongoFile::unlockAll();
+ }
+ };
+
+ /** template for what a new storage engine's class definition must implement
+ PRELIMINARY - subject to change.
+ */
+ class StorageContainerTemplate : public MongoFile {
+ protected:
+ virtual void close();
+ virtual void flush(bool sync);
+ public:
+ virtual long length();
+
+ /** pointer to a range of space in this storage unit */
+ class Pointer {
+ public:
+ /** retried address of buffer at offset 'offset' withing the storage unit. returned range is a contiguous
+ buffer reflecting what is in storage. caller will not read or write past 'len'.
+
+ note calls may be received that are at different points in a range and different lengths. however
+ for now assume that on writes, if a call is made, previously returned addresses are no longer valid. i.e.
+ p = at(10000, 500);
+ q = at(10000, 600);
+ after the second call it is ok if p is invalid.
+ */
+ void* at(int offset, int len);
+
+ /** indicate that we wrote to the range (from a previous at() call) and that it needs
+ flushing to disk.
+ */
+ void written(int offset, int len);
+
+ bool isNull() const;
+ };
+
+ /** commit written() calls from above. */
+ void commit();
+
+ Pointer open(const char *filename);
+ Pointer open(const char *_filename, long &length, int options=0);
+ };
+
+ class MemoryMappedFile : public MongoFile {
+ public:
+ class Pointer {
+ char *_base;
+ public:
+ Pointer() : _base(0) { }
+ Pointer(void *p) : _base((char*) p) { }
+ void* at(int offset, int maxLen) { return _base + offset; }
+ void grow(int offset, int len) { /* no action required with mem mapped file */ }
+ bool isNull() const { return _base == 0; }
};
MemoryMappedFile();
- ~MemoryMappedFile(); /* closes the file if open */
+ ~MemoryMappedFile() {
+ destroyed();
+ close();
+ }
void close();
- // Throws exception if file doesn't exist.
+ // Throws exception if file doesn't exist. (dm may2010: not sure if this is always true?)
void* map( const char *filename );
+ /*To replace map():
+
+ Pointer open( const char *filename ) {
+ void *p = map(filename);
+ uassert(13077, "couldn't open/map file", p);
+ return Pointer(p);
+ }*/
+
/* Creates with length if DNE, otherwise uses existing file length,
passed length.
*/
void* map(const char *filename, long &length, int options = 0 );
void flush(bool sync);
+ virtual Flushable * prepareFlush();
- void* viewOfs() {
+ /*void* viewOfs() {
return view;
- }
+ }*/
long length() {
return len;
}
- static void updateLength( const char *filename, long &length );
-
- static long long totalMappedLength();
- static void closeAllFiles( stringstream &message );
- static int flushAll( bool sync );
-
private:
- void created();
+ static void updateLength( const char *filename, long &length );
HANDLE fd;
HANDLE maphandle;
void *view;
long len;
string _filename;
+
+ protected:
+ // only posix mmap implementations will support this
+ virtual void _lock();
+ virtual void _unlock();
+
};
void printMemInfo( const char * where );
+#include "ramstore.h"
+
+//#define _RAMSTORE
+#if defined(_RAMSTORE)
+ typedef RamStoreFile MMF;
+#else
+ typedef MemoryMappedFile MMF;
+#endif
+
} // namespace mongo
diff --git a/util/mmap_mm.cpp b/util/mmap_mm.cpp
index 9cffad5..3cbb0d2 100644
--- a/util/mmap_mm.cpp
+++ b/util/mmap_mm.cpp
@@ -1,4 +1,4 @@
-// mmap_mm.cpp
+// mmap_mm.cpp - in memory (no file) version
/* Copyright 2009 10gen Inc.
*
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "mmap.h"
/* in memory (no file) version */
@@ -45,6 +45,8 @@ namespace mongo {
void MemoryMappedFile::flush(bool sync) {
}
+ void MemoryMappedFile::_lock() {}
+ void MemoryMappedFile::_unlock() {}
}
diff --git a/util/mmap_posix.cpp b/util/mmap_posix.cpp
index a5caf8c..af1592c 100644
--- a/util/mmap_posix.cpp
+++ b/util/mmap_posix.cpp
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "mmap.h"
#include "file_allocator.h"
+#include "../db/concurrency.h"
#include <errno.h>
#include <sys/mman.h>
@@ -60,7 +61,7 @@ namespace mongo {
fd = open(filename, O_RDWR | O_NOATIME);
if ( fd <= 0 ) {
- out() << "couldn't open " << filename << ' ' << OUTPUT_ERRNO << endl;
+ out() << "couldn't open " << filename << ' ' << errnoWithDescription() << endl;
return 0;
}
@@ -74,7 +75,7 @@ namespace mongo {
view = mmap(NULL, length, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if ( view == MAP_FAILED ) {
- out() << " mmap() failed for " << filename << " len:" << length << " " << OUTPUT_ERRNO << endl;
+ out() << " mmap() failed for " << filename << " len:" << length << " " << errnoWithDescription() << endl;
if ( errno == ENOMEM ){
out() << " mmap failed with out of memory, if you're using 32-bits, then you probably need to upgrade to 64" << endl;
}
@@ -86,10 +87,15 @@ namespace mongo {
#else
if ( options & SEQUENTIAL ){
if ( madvise( view , length , MADV_SEQUENTIAL ) ){
- out() << " madvise failed for " << filename << " " << OUTPUT_ERRNO << endl;
+ out() << " madvise failed for " << filename << " " << errnoWithDescription() << endl;
}
}
#endif
+
+ DEV if (! dbMutex.info().isLocked()){
+ _unlock();
+ }
+
return view;
}
@@ -97,9 +103,38 @@ namespace mongo {
if ( view == 0 || fd == 0 )
return;
if ( msync(view, len, sync ? MS_SYNC : MS_ASYNC) )
- problem() << "msync " << OUTPUT_ERRNO << endl;
+ problem() << "msync " << errnoWithDescription() << endl;
}
+ class PosixFlushable : public MemoryMappedFile::Flushable {
+ public:
+ PosixFlushable( void * view , HANDLE fd , long len )
+ : _view( view ) , _fd( fd ) , _len(len){
+ }
+
+ void flush(){
+ if ( _view && _fd )
+ if ( msync(_view, _len, MS_SYNC ) )
+ problem() << "msync " << errnoWithDescription() << endl;
+
+ }
+
+ void * _view;
+ HANDLE _fd;
+ long _len;
+ };
+
+ MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush(){
+ return new PosixFlushable( view , fd , len );
+ }
+
+ void MemoryMappedFile::_lock() {
+ if (view) assert(mprotect(view, len, PROT_READ | PROT_WRITE) == 0);
+ }
+
+ void MemoryMappedFile::_unlock() {
+ if (view) assert(mprotect(view, len, PROT_READ) == 0);
+ }
} // namespace mongo
diff --git a/util/mmap_win.cpp b/util/mmap_win.cpp
index 6168d9d..97e1589 100644
--- a/util/mmap_win.cpp
+++ b/util/mmap_win.cpp
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "mmap.h"
+#include "text.h"
#include <windows.h>
namespace mongo {
@@ -40,12 +41,6 @@ namespace mongo {
CloseHandle(fd);
fd = 0;
}
-
- std::wstring toWideString(const char *s) {
- std::basic_ostringstream<TCHAR> buf;
- buf << s;
- return buf.str();
- }
unsigned mapped = 0;
@@ -68,14 +63,14 @@ namespace mongo {
}
updateLength( filename, length );
- std::wstring filenamew = toWideString(filename);
DWORD createOptions = FILE_ATTRIBUTE_NORMAL;
if ( options & SEQUENTIAL )
createOptions |= FILE_FLAG_SEQUENTIAL_SCAN;
fd = CreateFile(
- filenamew.c_str(), GENERIC_WRITE | GENERIC_READ, FILE_SHARE_READ,
+ toNativeString(filename).c_str(),
+ GENERIC_WRITE | GENERIC_READ, FILE_SHARE_READ,
NULL, OPEN_ALWAYS, createOptions , NULL);
if ( fd == INVALID_HANDLE_VALUE ) {
out() << "Create/OpenFile failed " << filename << ' ' << GetLastError() << endl;
@@ -92,7 +87,7 @@ namespace mongo {
view = MapViewOfFile(maphandle, FILE_MAP_ALL_ACCESS, 0, 0, 0);
if ( view == 0 ) {
- out() << "MapViewOfFile failed " << filename << " " << OUTPUT_ERRNO << " ";
+ out() << "MapViewOfFile failed " << filename << " " << errnoWithDescription() << " ";
out() << GetLastError();
out() << endl;
}
@@ -100,21 +95,47 @@ namespace mongo {
return view;
}
+ class WindowsFlushable : public MemoryMappedFile::Flushable {
+ public:
+ WindowsFlushable( void * view , HANDLE fd , string filename )
+ : _view(view) , _fd(fd) , _filename(filename){
+
+ }
+
+ void flush(){
+ if (!_view || !_fd)
+ return;
+
+ bool success = FlushViewOfFile(_view, 0); // 0 means whole mapping
+ if (!success){
+ int err = GetLastError();
+ out() << "FlushViewOfFile failed " << err << " file: " << _filename << endl;
+ }
+
+ success = FlushFileBuffers(_fd);
+ if (!success){
+ int err = GetLastError();
+ out() << "FlushFileBuffers failed " << err << " file: " << _filename << endl;
+ }
+ }
+
+ void * _view;
+ HANDLE _fd;
+ string _filename;
+
+ };
+
void MemoryMappedFile::flush(bool sync) {
uassert(13056, "Async flushing not supported on windows", sync);
+
+ WindowsFlushable f( view , fd , _filename );
+ f.flush();
+ }
- if (!view || !fd) return;
-
- bool success = FlushViewOfFile(view, 0); // 0 means whole mapping
- if (!success){
- int err = GetLastError();
- out() << "FlushViewOfFile failed " << err << " file: " << _filename << endl;
- }
-
- success = FlushFileBuffers(fd);
- if (!success){
- int err = GetLastError();
- out() << "FlushFileBuffers failed " << err << " file: " << _filename << endl;
- }
+ MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush(){
+ return new WindowsFlushable( view , fd , _filename );
}
+ void MemoryMappedFile::_lock() {}
+ void MemoryMappedFile::_unlock() {}
+
}
diff --git a/util/mongoutils/README b/util/mongoutils/README
new file mode 100755
index 0000000..d3d874b
--- /dev/null
+++ b/util/mongoutils/README
@@ -0,0 +1,7 @@
+mongoutils namespace requirements:
+
+(1) code is not database specific, rather, true utilities
+(2) are cross platform
+(3) may require boost headers, but not libs
+(4) are clean and easy to use in any c++ project without pulling in lots of other stuff
+(5) apache license
diff --git a/util/mongoutils/checksum.h b/util/mongoutils/checksum.h
new file mode 100644
index 0000000..6beb7f4
--- /dev/null
+++ b/util/mongoutils/checksum.h
@@ -0,0 +1,32 @@
+/** @checksum.h */
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace mongoutils {
+
+ /**
+ * this is a silly temporary implementation
+ */
+ inline int checksum( const char* x , int size ){
+ int ck = 0;
+ for ( int i=0; i<size; i++ )
+ ck += ( (int)x[i] * ( i + 1 ) );
+ return ck;
+ }
+
+}
diff --git a/util/mongoutils/html.h b/util/mongoutils/html.h
new file mode 100644
index 0000000..e8502ec
--- /dev/null
+++ b/util/mongoutils/html.h
@@ -0,0 +1,158 @@
+// @file html.h
+
+#pragma once
+
+/* Things in the mongoutils namespace
+ (1) are not database specific, rather, true utilities
+ (2) are cross platform
+ (3) may require boost headers, but not libs
+ (4) are clean and easy to use in any c++ project without pulling in lots of other stuff
+*/
+
+/* Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sstream>
+
+namespace mongoutils {
+
+ namespace html {
+
+ using namespace std;
+
+ inline string _end() { return "</body></html>"; }
+ inline string _table() { return "</table>\n\n"; }
+ inline string _tr() { return "</tr>\n"; }
+
+ inline string tr() { return "<tr>"; }
+ inline string tr(string a, string b) {
+ stringstream ss;
+ ss << "<tr><td>" << a << "</td><td>" << b << "</td></tr>\n";
+ return ss.str();
+ }
+ template <class T>
+ inline string td(T x) {
+ stringstream ss;
+ ss << "<td>" << x << "</td>";
+ return ss.str();
+ }
+ inline string td(string x) {
+ return "<td>" + x + "</td>";
+ }
+ inline string th(string x) {
+ return "<th>" + x + "</th>";
+ }
+
+ inline void tablecell( stringstream& ss , bool b ){
+ ss << "<td>" << (b ? "<b>X</b>" : "") << "</td>";
+ }
+
+ template< typename T>
+ inline void tablecell( stringstream& ss , const T& t ){
+ ss << "<td>" << t << "</td>";
+ }
+
+ inline string table(const char *headers[] = 0, bool border = true) {
+ stringstream ss;
+ ss << "\n<table "
+ << (border?"border=1 ":"")
+ << "cellpadding=2 cellspacing=0>\n";
+ if( headers ) {
+ ss << "<tr>";
+ while( *headers ) {
+ ss << "<th>" << *headers << "</th>";
+ headers++;
+ }
+ ss << "</tr>\n";
+ }
+ return ss.str();
+ }
+
+ inline string start(string title) {
+ stringstream ss;
+ ss << "<html><head>\n<title>";
+ ss << title;
+ ss << "</title>\n";
+
+ ss << "<style type=\"text/css\" media=\"screen\">"
+ "body { font-family: helvetica, arial, san-serif }\n"
+ "table { border-collapse:collapse; border-color:#999; margin-top:.5em }\n"
+ "th { background-color:#bbb; color:#000 }\n"
+ "td,th { padding:.25em }\n"
+ "</style>\n";
+
+ ss << "</head>\n<body>\n";
+ return ss.str();
+ }
+
+ inline string red(string contentHtml, bool color=true) {
+ if( !color ) return contentHtml;
+ stringstream ss;
+ ss << "<span style=\"color:#A00;\">" << contentHtml << "</span>";
+ return ss.str();
+ }
+ inline string grey(string contentHtml, bool color=true) {
+ if( !color ) return contentHtml;
+ stringstream ss;
+ ss << "<span style=\"color:#888;\">" << contentHtml << "</span>";
+ return ss.str();
+ }
+ inline string blue(string contentHtml, bool color=true) {
+ if( !color ) return contentHtml;
+ stringstream ss;
+ ss << "<span style=\"color:#00A;\">" << contentHtml << "</span>";
+ return ss.str();
+ }
+ inline string yellow(string contentHtml, bool color=true) {
+ if( !color ) return contentHtml;
+ stringstream ss;
+ ss << "<span style=\"color:#A80;\">" << contentHtml << "</span>";
+ return ss.str();
+ }
+ inline string green(string contentHtml, bool color=true) {
+ if( !color ) return contentHtml;
+ stringstream ss;
+ ss << "<span style=\"color:#0A0;\">" << contentHtml << "</span>";
+ return ss.str();
+ }
+
+ inline string p(string contentHtml) {
+ stringstream ss;
+ ss << "<p>" << contentHtml << "</p>\n";
+ return ss.str();
+ }
+
+ inline string h2(string contentHtml) {
+ stringstream ss;
+ ss << "<h2>" << contentHtml << "</h2>\n";
+ return ss.str();
+ }
+
+ /* does NOT escape the strings. */
+ inline string a(string href, string title="", string contentHtml = "") {
+ stringstream ss;
+ ss << "<a";
+ if( !href.empty() ) ss << " href=\"" << href << '"';
+ if( !title.empty() ) ss << " title=\"" << title << '"';
+ ss << '>';
+ if( !contentHtml.empty() ) {
+ ss << contentHtml << "</a>";
+ }
+ return ss.str();
+ }
+
+ }
+
+}
diff --git a/util/mongoutils/mongoutils.vcxproj b/util/mongoutils/mongoutils.vcxproj
new file mode 100755
index 0000000..f8919cd
--- /dev/null
+++ b/util/mongoutils/mongoutils.vcxproj
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{7B84584E-92BC-4DB9-971B-A1A8F93E5053}</ProjectGuid>
+ <RootNamespace>mongoutils</RootNamespace>
+ <ProjectName>mongoutils test program</ProjectName>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>MultiByte</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>MultiByte</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup />
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ </ClCompile>
+ <Link>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ </ClCompile>
+ <Link>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemGroup>
+ <ClCompile Include="test.cpp" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="html.h" />
+ <ClInclude Include="str.h" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ </ImportGroup>
+</Project> \ No newline at end of file
diff --git a/util/mongoutils/mongoutils.vcxproj.filters b/util/mongoutils/mongoutils.vcxproj.filters
new file mode 100755
index 0000000..84ecbff
--- /dev/null
+++ b/util/mongoutils/mongoutils.vcxproj.filters
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="test.cpp" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="html.h" />
+ <ClInclude Include="str.h" />
+ </ItemGroup>
+</Project> \ No newline at end of file
diff --git a/util/mongoutils/str.h b/util/mongoutils/str.h
new file mode 100644
index 0000000..2028264
--- /dev/null
+++ b/util/mongoutils/str.h
@@ -0,0 +1,118 @@
+// @file str.h
+
+/* Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+/* Things in the mongoutils namespace
+ (1) are not database specific, rather, true utilities
+ (2) are cross platform
+ (3) may require boost headers, but not libs
+ (4) are clean and easy to use in any c++ project without pulling in lots of other stuff
+
+ Note: within this module, we use int for all offsets -- there are no unsigned offsets
+ and no size_t's. If you need 3 gigabyte long strings, don't use this module.
+*/
+
+#include <string>
+#include <sstream>
+
+namespace mongoutils {
+
+ namespace str {
+
+ using namespace std;
+
+ /** the idea here is to make one liners easy. e.g.:
+
+ return str::stream() << 1 << ' ' << 2;
+
+ since the following doesn't work:
+
+ (stringstream() << 1).str();
+ */
+ class stream {
+ public:
+ stringstream ss;
+
+ template<class T>
+ stream& operator<<(const T& v) {
+ ss << v;
+ return *this;
+ }
+
+ operator std::string () const { return ss.str(); }
+ };
+
+ inline bool startsWith(const char *str, const char *prefix) {
+ const char *s = str;
+ const char *p = prefix;
+ while( *p ) {
+ if( *p != *s ) return false;
+ p++; s++;
+ }
+ return true;
+ }
+ inline bool startsWith(string s, string p) { return startsWith(s.c_str(), p.c_str()); }
+
+ inline bool endsWith(string s, string p) {
+ int l = p.size();
+ int x = s.size();
+ if( x < l ) return false;
+ return strncmp(s.c_str()+x-l, p.c_str(), l) == 0;
+ }
+
+ /** find char x, and return rest of string thereafter, or "" if not found */
+ inline const char * after(const char *s, char x) {
+ const char *p = strchr(s, x);
+ return (p != 0) ? p+1 : ""; }
+ inline string after(const string& s, char x) {
+ const char *p = strchr(s.c_str(), x);
+ return (p != 0) ? string(p+1) : ""; }
+
+ inline const char * after(const char *s, const char *x) {
+ const char *p = strstr(s, x);
+ return (p != 0) ? p+strlen(x) : ""; }
+ inline string after(string s, string x) {
+ const char *p = strstr(s.c_str(), x.c_str());
+ return (p != 0) ? string(p+x.size()) : ""; }
+
+ inline bool contains(string s, string x) {
+ return strstr(s.c_str(), x.c_str()) != 0; }
+
+ /** @return everything befor the character x, else entire string */
+ inline string before(const string& s, char x) {
+ const char *p = strchr(s.c_str(), x);
+ return (p != 0) ? s.substr(0, p-s.c_str()) : s; }
+
+ /** check if if strings share a common starting prefix
+ @return offset of divergence (or length if equal). 0=nothing in common. */
+ inline int shareCommonPrefix(const char *p, const char *q) {
+ int ofs = 0;
+ while( 1 ) {
+ if( *p == 0 || *q == 0 )
+ break;
+ if( *p != *q )
+ break;
+ p++; q++; ofs++;
+ }
+ return ofs; }
+ inline int shareCommonPrefix(const string &a, const string &b)
+ { return shareCommonPrefix(a.c_str(), b.c_str()); }
+
+ }
+
+}
diff --git a/util/mongoutils/test.cpp b/util/mongoutils/test.cpp
new file mode 100755
index 0000000..0420624
--- /dev/null
+++ b/util/mongoutils/test.cpp
@@ -0,0 +1,34 @@
+/* @file test.cpp
+ utils/mongoutils/test.cpp
+ unit tests for mongoutils
+*/
+
+/*
+ * Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "str.h"
+#include "html.h"
+#include <assert.h>
+
+using namespace std;
+using namespace mongoutils;
+
+int main() {
+ string x = str::after("abcde", 'c');
+ assert( x == "de" );
+ assert( str::after("abcde", 'x') == "" );
+ return 0;
+}
diff --git a/util/ntservice.cpp b/util/ntservice.cpp
index 251be92..fe4ae44 100644
--- a/util/ntservice.cpp
+++ b/util/ntservice.cpp
@@ -15,8 +15,10 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "ntservice.h"
+#include "winutil.h"
+#include "text.h"
#include <direct.h>
#if defined(_WIN32)
@@ -25,14 +27,14 @@ namespace mongo {
void shutdown();
- SERVICE_STATUS_HANDLE ServiceController::_statusHandle = null;
+ SERVICE_STATUS_HANDLE ServiceController::_statusHandle = NULL;
std::wstring ServiceController::_serviceName;
- ServiceCallback ServiceController::_serviceCallback = null;
+ ServiceCallback ServiceController::_serviceCallback = NULL;
ServiceController::ServiceController() {
}
- bool ServiceController::installService( const std::wstring& serviceName, const std::wstring& displayName, const std::wstring& serviceDesc, int argc, char* argv[] ) {
+ bool ServiceController::installService( const std::wstring& serviceName, const std::wstring& displayName, const std::wstring& serviceDesc, const std::wstring& serviceUser, const std::wstring& servicePassword, const std::string dbpath, int argc, char* argv[] ) {
assert(argc >= 1);
stringstream commandLine;
@@ -46,38 +48,81 @@ namespace mongo {
}
for ( int i = 1; i < argc; i++ ) {
- std::string arg( argv[ i ] );
-
- // replace install command to indicate process is being started as a service
- if ( arg == "--install" )
- arg = "--service";
-
- commandLine << arg << " ";
- }
-
- SC_HANDLE schSCManager = ::OpenSCManager( null, null, SC_MANAGER_ALL_ACCESS );
- if ( schSCManager == null )
+ std::string arg( argv[ i ] );
+ // replace install command to indicate process is being started as a service
+ if ( arg == "--install" || arg == "--reinstall" ) {
+ arg = "--service";
+ } else if ( arg == "--dbpath" && i + 1 < argc ) {
+ commandLine << arg << " \"" << dbpath << "\" ";
+ i++;
+ continue;
+ } else if ( arg.length() > 9 && arg.substr(0, 9) == "--service" ) {
+ // Strip off --service(Name|User|Password) arguments
+ continue;
+ }
+ commandLine << arg << " ";
+ }
+
+ SC_HANDLE schSCManager = ::OpenSCManager( NULL, NULL, SC_MANAGER_ALL_ACCESS );
+ if ( schSCManager == NULL ) {
+ DWORD err = ::GetLastError();
+ cerr << "Error connecting to the Service Control Manager: " << GetWinErrMsg(err) << endl;
+ return false;
+ }
+
+ // Make sure servise doesn't already exist.
+ // TODO: Check to see if service is in "Deleting" status, suggest the user close down Services MMC snap-ins.
+ SC_HANDLE schService = ::OpenService( schSCManager, serviceName.c_str(), SERVICE_ALL_ACCESS );
+ if ( schService != NULL ) {
+ cerr << "There is already a service named " << toUtf8String(serviceName) << ". Aborting" << endl;
+ ::CloseServiceHandle( schService );
+ ::CloseServiceHandle( schSCManager );
return false;
-
+ }
std::basic_ostringstream< TCHAR > commandLineWide;
- commandLineWide << commandLine.str().c_str();
+ commandLineWide << commandLine.str().c_str();
+
+ cerr << "Creating service " << toUtf8String(serviceName) << "." << endl;
// create new service
- SC_HANDLE schService = ::CreateService( schSCManager, serviceName.c_str(), displayName.c_str(),
+ schService = ::CreateService( schSCManager, serviceName.c_str(), displayName.c_str(),
SERVICE_ALL_ACCESS, SERVICE_WIN32_OWN_PROCESS,
SERVICE_AUTO_START, SERVICE_ERROR_NORMAL,
- commandLineWide.str().c_str(), null, null, L"\0\0", null, null );
+ commandLineWide.str().c_str(), NULL, NULL, L"\0\0", NULL, NULL );
+ if ( schService == NULL ) {
+ DWORD err = ::GetLastError();
+ cerr << "Error creating service: " << GetWinErrMsg(err) << endl;
+ ::CloseServiceHandle( schSCManager );
+ return false;
+ }
- if ( schService == null ) {
- ::CloseServiceHandle( schSCManager );
- return false;
- }
+ cerr << "Service creation successful." << endl;
+ cerr << "Service can be started from the command line via 'net start \"" << toUtf8String(serviceName) << "\"'." << endl;
+
+ bool serviceInstalled;
+
+ // TODO: If neccessary grant user "Login as a Service" permission.
+ if ( !serviceUser.empty() ) {
+ std::wstring actualServiceUser;
+ if ( serviceUser.find(L"\\") == string::npos ) {
+ actualServiceUser = L".\\" + serviceUser;
+ }
+ else {
+ actualServiceUser = serviceUser;
+ }
+ cerr << "Setting service login credentials. User: " << toUtf8String(actualServiceUser) << endl;
+ serviceInstalled = ::ChangeServiceConfig( schService, SERVICE_NO_CHANGE, SERVICE_NO_CHANGE, SERVICE_NO_CHANGE, NULL, NULL, NULL, NULL, actualServiceUser.c_str(), servicePassword.c_str(), NULL );
+ if ( !serviceInstalled ) {
+ cerr << "Setting service login failed. Service has 'LocalService' permissions." << endl;
+ }
+ }
+
+ // set the service description
SERVICE_DESCRIPTION serviceDescription;
serviceDescription.lpDescription = (LPTSTR)serviceDesc.c_str();
-
- // set new service description
- bool serviceInstalled = ::ChangeServiceConfig2( schService, SERVICE_CONFIG_DESCRIPTION, &serviceDescription );
+ serviceInstalled = ::ChangeServiceConfig2( schService, SERVICE_CONFIG_DESCRIPTION, &serviceDescription );
+
if ( serviceInstalled ) {
SC_ACTION aActions[ 3 ] = { { SC_ACTION_RESTART, 0 }, { SC_ACTION_RESTART, 0 }, { SC_ACTION_RESTART, 0 } };
@@ -89,8 +134,12 @@ namespace mongo {
// set service recovery options
serviceInstalled = ::ChangeServiceConfig2( schService, SERVICE_CONFIG_FAILURE_ACTIONS, &serviceFailure );
+
}
-
+ else {
+ cerr << "Could not set service description. Check the event log for more details." << endl;
+ }
+
::CloseServiceHandle( schService );
::CloseServiceHandle( schSCManager );
@@ -98,13 +147,16 @@ namespace mongo {
}
bool ServiceController::removeService( const std::wstring& serviceName ) {
- SC_HANDLE schSCManager = ::OpenSCManager( null, null, SC_MANAGER_ALL_ACCESS );
- if ( schSCManager == null )
- return false;
+ SC_HANDLE schSCManager = ::OpenSCManager( NULL, NULL, SC_MANAGER_ALL_ACCESS );
+ if ( schSCManager == NULL ) {
+ DWORD err = ::GetLastError();
+ cerr << "Error connecting to the Service Control Manager: " << GetWinErrMsg(err) << endl;
+ return false;
+ }
SC_HANDLE schService = ::OpenService( schSCManager, serviceName.c_str(), SERVICE_ALL_ACCESS );
-
- if ( schService == null ) {
+ if ( schService == NULL ) {
+ cerr << "Could not find a service named " << toUtf8String(serviceName) << " to uninstall." << endl;
::CloseServiceHandle( schSCManager );
return false;
}
@@ -113,20 +165,30 @@ namespace mongo {
// stop service if its running
if ( ::ControlService( schService, SERVICE_CONTROL_STOP, &serviceStatus ) ) {
+ cerr << "Service " << toUtf8String(serviceName) << " is currently running. Stopping service." << endl;
while ( ::QueryServiceStatus( schService, &serviceStatus ) ) {
if ( serviceStatus.dwCurrentState == SERVICE_STOP_PENDING )
- {
- Sleep( 1000 );
- }
- else { break; }
+ {
+ Sleep( 1000 );
+ }
+ else { break; }
}
+ cerr << "Service stopped." << endl;
}
+ cerr << "Deleting service " << toUtf8String(serviceName) << "." << endl;
bool serviceRemoved = ::DeleteService( schService );
::CloseServiceHandle( schService );
::CloseServiceHandle( schSCManager );
+ if (serviceRemoved) {
+ cerr << "Service deleted successfully." << endl;
+ }
+ else {
+ cerr << "Failed to delete service." << endl;
+ }
+
return serviceRemoved;
}
@@ -136,14 +198,14 @@ namespace mongo {
SERVICE_TABLE_ENTRY dispTable[] = {
{ (LPTSTR)serviceName.c_str(), (LPSERVICE_MAIN_FUNCTION)ServiceController::initService },
- { null, null }
+ { NULL, NULL }
};
return StartServiceCtrlDispatcher( dispTable );
}
bool ServiceController::reportStatus( DWORD reportState, DWORD waitHint ) {
- if ( _statusHandle == null )
+ if ( _statusHandle == NULL )
return false;
static DWORD checkPoint = 1;
diff --git a/util/ntservice.h b/util/ntservice.h
index 00b8a0a..271e7d7 100644
--- a/util/ntservice.h
+++ b/util/ntservice.h
@@ -29,7 +29,7 @@ namespace mongo {
ServiceController();
virtual ~ServiceController() {}
- static bool installService( const std::wstring& serviceName, const std::wstring& displayName, const std::wstring& serviceDesc, int argc, char* argv[] );
+ static bool installService( const std::wstring& serviceName, const std::wstring& displayName, const std::wstring& serviceDesc, const std::wstring& serviceUser, const std::wstring& servicePassword, const std::string dbpath, int argc, char* argv[] );
static bool removeService( const std::wstring& serviceName );
static bool startService( const std::wstring& serviceName, ServiceCallback startService );
static bool reportStatus( DWORD reportState, DWORD waitHint = 0 );
diff --git a/util/optime.h b/util/optime.h
index 8b26434..4645968 100644
--- a/util/optime.h
+++ b/util/optime.h
@@ -20,15 +20,20 @@
#include "../db/concurrency.h"
namespace mongo {
- void exitCleanly( int code );
+ void exitCleanly( ExitCode code );
- /* Operation sequence #. A combination of current second plus an ordinal value.
- */
struct ClockSkewException : public DBException {
- virtual const char* what() const throw() { return "clock skew exception"; }
- virtual int getCode(){ return 20001; }
+ ClockSkewException() : DBException( "clock skew exception" , 20001 ){}
};
-
+
+ /* replsets use RSOpTime.
+ M/S uses OpTime.
+ But this is useable from both.
+ */
+ typedef unsigned long long ReplTime;
+
+ /* Operation sequence #. A combination of current second plus an ordinal value.
+ */
#pragma pack(4)
class OpTime {
unsigned i;
@@ -44,8 +49,8 @@ namespace mongo {
OpTime(Date_t date) {
reinterpret_cast<unsigned long long&>(*this) = date.millis;
}
- OpTime(unsigned long long date) {
- reinterpret_cast<unsigned long long&>(*this) = date;
+ OpTime(ReplTime x) {
+ reinterpret_cast<unsigned long long&>(*this) = x;
}
OpTime(unsigned a, unsigned b) {
secs = a;
@@ -87,14 +92,14 @@ namespace mongo {
bytes of overhead.
*/
unsigned long long asDate() const {
- return *((unsigned long long *) &i);
+ return reinterpret_cast<const unsigned long long*>(&i)[0];
}
- // unsigned long long& asDate() { return *((unsigned long long *) &i); }
-
- bool isNull() {
- return secs == 0;
+ long long asLL() const {
+ return reinterpret_cast<const long long*>(&i)[0];
}
+ bool isNull() const { return secs == 0; }
+
string toStringLong() const {
char buf[64];
time_t_to_String(secs, buf);
@@ -104,12 +109,18 @@ namespace mongo {
return ss.str();
}
+ string toStringPretty() const {
+ stringstream ss;
+ ss << time_t_to_String_short(secs) << ':' << hex << i;
+ return ss.str();
+ }
+
string toString() const {
stringstream ss;
ss << hex << secs << ':' << i;
return ss.str();
}
- operator string() const { return toString(); }
+
bool operator==(const OpTime& r) const {
return i == r.i && secs == r.secs;
}
@@ -121,6 +132,15 @@ namespace mongo {
return secs < r.secs;
return i < r.i;
}
+ bool operator<=(const OpTime& r) const {
+ return *this < r || *this == r;
+ }
+ bool operator>(const OpTime& r) const {
+ return !(*this <= r);
+ }
+ bool operator>=(const OpTime& r) const {
+ return !(*this < r);
+ }
};
#pragma pack()
diff --git a/util/password.cpp b/util/password.cpp
new file mode 100644
index 0000000..3037da1
--- /dev/null
+++ b/util/password.cpp
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "password.h"
+#include <iostream>
+
+#ifndef _WIN32
+#include <termios.h>
+#endif
+
+using namespace std;
+
+namespace mongo {
+
+ string askPassword() {
+
+ std::string password;
+ cout << "Enter password: ";
+#ifndef _WIN32
+ const int stdinfd = 0;
+ termios termio;
+ tcflag_t old = 0;
+ if ( isatty( stdinfd ) ) {
+ int i = tcgetattr( stdinfd, &termio );
+ if( i == -1 ) {
+ cerr << "Cannot get terminal attributes " << errnoWithDescription() << endl;
+ return string();
+ }
+ old = termio.c_lflag;
+ termio.c_lflag &= ~ECHO;
+ i = tcsetattr( stdinfd, TCSANOW, &termio );
+ if( i == -1 ) {
+ cerr << "Cannot set terminal attributes " << errnoWithDescription() << endl;
+ return string();
+ }
+ }
+
+ cin >> password;
+
+ if ( isatty( stdinfd ) ) {
+ termio.c_lflag = old;
+ int i = tcsetattr( stdinfd, TCSANOW, &termio );
+ if( i == -1 ) {
+ cerr << "Cannot set terminal attributes " << errnoWithDescription() << endl;
+ return string();
+ }
+ }
+#else
+ HANDLE stdinh = GetStdHandle( STD_INPUT_HANDLE );
+ if ( stdinh == INVALID_HANDLE_VALUE) {
+ cerr << "Cannot get stdin handle " << GetLastError() << "\n";
+ return string();
+ }
+
+ DWORD old;
+ if ( !GetConsoleMode( stdinh, &old ) ) {
+ cerr << "Cannot get console mode " << GetLastError() << "\n";
+ return string();
+ }
+
+ DWORD noecho = ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT;
+ if ( !SetConsoleMode( stdinh, noecho ) ) {
+ cerr << "Cannot set console mode " << GetLastError() << "\n";
+ return string();
+ }
+
+ cin >> password;
+
+ if ( !SetConsoleMode( stdinh, old ) ) {
+ cerr << "Cannot set console mode " << GetLastError() << "\n";
+ return string();
+ }
+ cin.get();
+#endif
+ cout << "\n";
+ return password;
+ }
+}
diff --git a/util/password.h b/util/password.h
new file mode 100644
index 0000000..18294b2
--- /dev/null
+++ b/util/password.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <boost/program_options.hpp>
+#include <string>
+
+namespace mongo {
+
+ struct PasswordValue : public boost::program_options::typed_value<std::string> {
+
+ PasswordValue( std::string* val )
+ : boost::program_options::typed_value<std::string>( val ) { }
+
+ unsigned min_tokens() const {
+ return 0;
+ }
+
+ unsigned max_tokens() const {
+ return 1;
+ }
+
+ bool is_required() const {
+ return false;
+ }
+
+ void xparse( boost::any& value_store,
+ const std::vector<std::string>& new_tokens ) const {
+ if ( !value_store.empty() )
+#if BOOST_VERSION >= 104200
+ boost::throw_exception( boost::program_options::validation_error( boost::program_options::validation_error::multiple_values_not_allowed ) );
+#else
+ boost::throw_exception( boost::program_options::validation_error( "multiple values not allowed" ) );
+#endif
+ else if ( !new_tokens.empty() )
+ boost::program_options::typed_value<std::string>::xparse
+ (value_store, new_tokens);
+ else
+ value_store = std::string();
+ }
+
+ };
+
+ std::string askPassword();
+
+}
diff --git a/util/processinfo.cpp b/util/processinfo.cpp
new file mode 100644
index 0000000..3257b5e
--- /dev/null
+++ b/util/processinfo.cpp
@@ -0,0 +1,47 @@
+// processinfo.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "processinfo.h"
+
+#include <iostream>
+using namespace std;
+
+namespace mongo {
+
+ class PidFileWiper {
+ public:
+ ~PidFileWiper(){
+ ofstream out( path.c_str() , ios_base::out );
+ out.close();
+ }
+
+ void write( const string& p ){
+ path = p;
+ ofstream out( path.c_str() , ios_base::out );
+ out << getpid() << endl;
+ out.close();
+ }
+
+ string path;
+ } pidFileWiper;
+
+ void writePidFile( const string& path ){
+ pidFileWiper.write( path );
+ }
+
+}
diff --git a/util/processinfo.h b/util/processinfo.h
index b7bc90d..8e20beb 100644
--- a/util/processinfo.h
+++ b/util/processinfo.h
@@ -18,6 +18,7 @@
#pragma once
#include <sys/types.h>
+#include <string>
#ifndef _WIN32
#include <unistd.h>
@@ -58,5 +59,7 @@ namespace mongo {
private:
pid_t _pid;
};
-
+
+ void writePidFile( const std::string& path );
+
}
diff --git a/util/processinfo_darwin.cpp b/util/processinfo_darwin.cpp
index 206c270..cb54bed 100644
--- a/util/processinfo_darwin.cpp
+++ b/util/processinfo_darwin.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "../stdafx.h"
+#include "../pch.h"
#include "processinfo.h"
#include "log.h"
@@ -108,7 +108,7 @@ namespace mongo {
start = start - ( (unsigned long long)start % pageSize );
char x = 0;
if ( mincore( start , 128 , &x ) ){
- log() << "mincore failed: " << OUTPUT_ERRNO << endl;
+ log() << "mincore failed: " << errnoWithDescription() << endl;
return 1;
}
return x & 0x1;
diff --git a/util/processinfo_linux2.cpp b/util/processinfo_linux2.cpp
index 917e707..02a7ad0 100644
--- a/util/processinfo_linux2.cpp
+++ b/util/processinfo_linux2.cpp
@@ -40,11 +40,10 @@ namespace mongo {
FILE * f = fopen( name , "r");
if ( ! f ){
stringstream ss;
- ss << "couldn't open [" << name << "] " << OUTPUT_ERRNO;
+ ss << "couldn't open [" << name << "] " << errnoWithDescription();
string s = ss.str();
- msgasserted( 13276 , s.c_str() );
+ msgassertedNoTrace( 13276 , s.c_str() );
}
-
int found = fscanf(f,
"%d %s %c "
"%d %d %d %d %d "
@@ -232,7 +231,7 @@ namespace mongo {
start = start - ( (unsigned long long)start % pageSize );
unsigned char x = 0;
if ( mincore( start , 128 , &x ) ){
- log() << "mincore failed: " << OUTPUT_ERRNO << endl;
+ log() << "mincore failed: " << errnoWithDescription() << endl;
return 1;
}
return x & 0x1;
diff --git a/util/processinfo_none.cpp b/util/processinfo_none.cpp
index 9af1766..b54cb13 100644
--- a/util/processinfo_none.cpp
+++ b/util/processinfo_none.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "processinfo.h"
#include <iostream>
diff --git a/util/processinfo_win32.cpp b/util/processinfo_win32.cpp
index 0705fcb..5fc6ab5 100644
--- a/util/processinfo_win32.cpp
+++ b/util/processinfo_win32.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "processinfo.h"
#include <iostream>
diff --git a/util/queue.h b/util/queue.h
index d48e012..35e02a8 100644
--- a/util/queue.h
+++ b/util/queue.h
@@ -1,4 +1,4 @@
-// queue.h
+// @file queue.h
/* Copyright 2009 10gen Inc.
*
@@ -17,7 +17,7 @@
#pragma once
-#include "../stdafx.h"
+#include "../pch.h"
#include "../util/goodies.h"
#include <queue>
@@ -29,6 +29,8 @@ namespace mongo {
*/
template<typename T> class BlockingQueue : boost::noncopyable {
public:
+ BlockingQueue() : _lock("BlockingQueue") { }
+
void push(T const& t){
scoped_lock l( _lock );
_queue.push( t );
diff --git a/util/ramlog.h b/util/ramlog.h
new file mode 100644
index 0000000..393527d
--- /dev/null
+++ b/util/ramlog.h
@@ -0,0 +1,142 @@
+// log.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "log.h"
+#include "mongoutils/html.h"
+
+namespace mongo {
+
+ class RamLog : public Tee {
+ enum {
+ N = 128,
+ C = 256
+ };
+ char lines[N][C];
+ unsigned h, n;
+
+ public:
+ RamLog() {
+ h = 0; n = 0;
+ for( int i = 0; i < N; i++ )
+ lines[i][C-1] = 0;
+ }
+
+ virtual void write(LogLevel ll, const string& str) {
+ char *p = lines[(h+n)%N];
+ if( str.size() < C )
+ strcpy(p, str.c_str());
+ else
+ memcpy(p, str.c_str(), C-1);
+ if( n < N ) n++;
+ else h = (h+1) % N;
+ }
+
+ void get( vector<const char*>& v) const {
+ for( unsigned x=0, i=h; x++ < n; i=(i+1)%N )
+ v.push_back(lines[i]);
+ }
+
+ static int repeats(const vector<const char *>& v, int i) {
+ for( int j = i-1; j >= 0 && j+8 > i; j-- ) {
+ if( strcmp(v[i]+20,v[j]+20) == 0 ) {
+ for( int x = 1; ; x++ ) {
+ if( j+x == i ) return j;
+ if( i+x>=(int) v.size() ) return -1;
+ if( strcmp(v[i+x]+20,v[j+x]+20) ) return -1;
+ }
+ return -1;
+ }
+ }
+ return -1;
+ }
+
+
+ static string clean(const vector<const char *>& v, int i, string line="") {
+ if( line.empty() ) line = v[i];
+ if( i > 0 && strncmp(v[i], v[i-1], 11) == 0 )
+ return string(" ") + line.substr(11);
+ return v[i];
+ }
+
+ static string color(string line) {
+ string s = str::after(line, "replSet ");
+ if( str::startsWith(s, "warning") || startsWith(s, "error") )
+ return html::red(line);
+ if( str::startsWith(s, "info") ) {
+ if( str::endsWith(s, " up\n") )
+ return html::green(line);
+ else if( str::contains(s, " down ") || str::endsWith(s, " down\n") )
+ return html::yellow(line);
+ return line; //html::blue(line);
+ }
+
+ return line;
+ }
+
+ /* turn http:... into an anchor */
+ string linkify(const char *s) {
+ const char *p = s;
+ const char *h = strstr(p, "http://");
+ if( h == 0 ) return s;
+
+ const char *sp = h + 7;
+ while( *sp && *sp != ' ' ) sp++;
+
+ string url(h, sp-h);
+ stringstream ss;
+ ss << string(s, h-s) << "<a href=\"" << url << "\">" << url << "</a>" << sp;
+ return ss.str();
+ }
+
+ void toHTML(stringstream& s) {
+ vector<const char*> v;
+ get( v );
+
+ bool first = true;
+ s << "<pre>\n";
+ for( int i = 0; i < (int)v.size(); i++ ) {
+ assert( strlen(v[i]) > 20 );
+ int r = repeats(v, i);
+ if( r < 0 ) {
+ s << color( linkify( clean(v,i).c_str() ) );
+ }
+ else {
+ stringstream x;
+ x << string(v[i], 0, 20);
+ int nr = (i-r);
+ int last = i+nr-1;
+ for( ; r < i ; r++ ) x << '.';
+ if( 1 ) {
+ stringstream r;
+ if( nr == 1 ) r << "repeat last line";
+ else r << "repeats last " << nr << " lines; ends " << string(v[last]+4,0,15);
+ first = false; s << html::a("", r.str(), clean(v,i,x.str()));
+ }
+ else s << x.str();
+ s << '\n';
+ i = last;
+ }
+ }
+ s << "</pre>\n";
+ }
+
+
+ };
+
+}
diff --git a/util/ramstore.cpp b/util/ramstore.cpp
new file mode 100644
index 0000000..0bdf2e2
--- /dev/null
+++ b/util/ramstore.cpp
@@ -0,0 +1,93 @@
+/**
+* Copyright (C) 2008 10gen Inc.info
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "mmap.h"
+
+namespace mongo {
+
+ //extern bool checkNsFilesOnLoad;
+
+static set<RamStoreFile*> files;
+
+void RamStoreFile::grow(int offset, int len) {
+ cout << "GROW ofs:" << offset << " len:" << len;
+ assert( len > 0 );
+ Node& n = _m[offset];
+ cout << " oldlen:" << n.len << endl;
+ assert( n.len > 0 );
+ if( len > n.len ) {
+ n.p = (char *) realloc(n.p, len);
+ memset(((char *)n.p) + n.len, 0, len - n.len);
+ n.len = len;
+ }
+}
+
+/* maxLen can be -1 for existing data */
+void* RamStoreFile::at(int offset, int maxLen) {
+ if( offset != _last ) {
+ if( _m.count(_last) ) {
+ _m[_last].check();
+ if( !(offset < _last || offset >= _last + _m[_last].len) ) {
+ cout << offset << ' ' << _last << ' ' << _m[_last].len << endl;
+ assert(false);
+ }
+ }
+ }
+ _last = offset;
+
+ Node& n = _m[offset];
+ if( n.len == 0 ) {
+ // create
+ if( strstr(name, ".ns") == 0 )
+ cout << "CREATE " << name << " ofs:" << offset << " len:" << maxLen << endl;
+ assert( maxLen >= 0 );
+ n.p = (char *) calloc(maxLen+1, 1);
+ n.len = maxLen;
+ }
+ assert( n.len >= maxLen );
+ n.check();
+ return n.p;
+ }
+
+void RamStoreFile::Node::check() {
+ assert( p[len] == 0 );
+}
+
+void RamStoreFile::check() {
+ for( std::map<int,Node>::iterator i = _m.begin(); i != _m.end(); i++ ) {
+ i->second.check();
+ }
+}
+
+void RamStoreFile::validate() {
+ for( set<RamStoreFile*>::iterator i = files.begin(); i != files.end(); i++ ) {
+ (*i)->check();
+ }
+}
+
+RamStoreFile::~RamStoreFile() {
+ check();
+ files.erase(this);
+}
+
+RamStoreFile::RamStoreFile() : _len(0) {
+ // checkNsFilesOnLoad = false;
+ files.insert(this);
+}
+
+}
+
diff --git a/util/ramstore.h b/util/ramstore.h
new file mode 100644
index 0000000..f75a57a
--- /dev/null
+++ b/util/ramstore.h
@@ -0,0 +1,86 @@
+// ramstore.h
+
+// mmap.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+extern bool checkNsFilesOnLoad;
+
+class RamStoreFile : public MongoFile {
+ char name[256];
+ struct Node {
+ char *p;
+ int len;
+ Node() : len(0) { }
+ void check();
+ };
+ std::map<int,Node> _m;
+ long _len;
+
+ static void validate();
+ void check();
+
+ int _last;
+
+ void grow(int offset, int len);
+
+ /* maxLen can be -1 for existing data */
+ void* at(int offset, int maxLen);
+
+protected:
+ virtual void close() {
+ cout << "ramstore dealloc not yet implemented" << endl;
+ if( _len ) {
+ _len = 0;
+ }
+ }
+ virtual void flush(bool sync) { }
+
+public:
+ ~RamStoreFile();
+ RamStoreFile();
+
+ virtual long length() { return _len; }
+
+ class Pointer {
+ RamStoreFile* _f;
+ friend class RamStoreFile;
+ public:
+ void* at(int offset, int len) {
+ assert( len <= /*MaxBSONObjectSize*/4*1024*1024 + 128 );
+ return _f->at(offset,len);
+ }
+ void grow(int offset, int len) {
+ assert( len <= /*MaxBSONObjectSize*/4*1024*1024 + 128 );
+ _f->grow(offset,len);
+ }
+ bool isNull() const { return _f == 0; }
+ };
+
+ Pointer map( const char *filename ) {
+ assert(false); return Pointer();
+ }
+ Pointer map(const char *_filename, long &length, int options=0) {
+ strncpy(name, _filename, sizeof(name)-1);
+ Pointer p;
+ p._f = this;
+ return p;
+ }
+
+ static bool exists(boost::filesystem::path p) {
+ return false;
+ }
+};
diff --git a/util/sock.cpp b/util/sock.cpp
index 5beac68..c4e1a71 100644
--- a/util/sock.cpp
+++ b/util/sock.cpp
@@ -15,31 +15,134 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "sock.h"
namespace mongo {
- static mongo::mutex sock_mutex;
+ static mongo::mutex sock_mutex("sock_mutex");
- string hostbyname(const char *hostname) {
- static string unknown = "0.0.0.0";
- if ( unknown == hostname )
- return unknown;
+ static bool ipv6 = false;
+ void enableIPv6(bool state) { ipv6 = state; }
+ bool IPv6Enabled() { return ipv6; }
- scoped_lock lk(sock_mutex);
-#if defined(_WIN32)
- if( inet_addr(hostname) != INADDR_NONE )
- return hostname;
-#else
- struct in_addr temp;
- if ( inet_aton( hostname, &temp ) )
- return hostname;
+ SockAddr::SockAddr(int sourcePort) {
+ memset(as<sockaddr_in>().sin_zero, 0, sizeof(as<sockaddr_in>().sin_zero));
+ as<sockaddr_in>().sin_family = AF_INET;
+ as<sockaddr_in>().sin_port = htons(sourcePort);
+ as<sockaddr_in>().sin_addr.s_addr = htonl(INADDR_ANY);
+ addressSize = sizeof(sockaddr_in);
+ }
+
+ SockAddr::SockAddr(const char * iporhost , int port) {
+ if (!strcmp(iporhost, "localhost"))
+ iporhost = "127.0.0.1";
+
+ if (strchr(iporhost, '/')){
+#ifdef _WIN32
+ uassert(13080, "no unix socket support on windows", false);
#endif
- struct hostent *h;
- h = gethostbyname(hostname);
- if ( h == 0 ) return "";
- return inet_ntoa( *((struct in_addr *)(h->h_addr)) );
+ uassert(13079, "path to unix socket too long", strlen(iporhost) < sizeof(as<sockaddr_un>().sun_path));
+ as<sockaddr_un>().sun_family = AF_UNIX;
+ strcpy(as<sockaddr_un>().sun_path, iporhost);
+ addressSize = sizeof(sockaddr_un);
+ }else{
+ addrinfo* addrs = NULL;
+ addrinfo hints;
+ memset(&hints, 0, sizeof(addrinfo));
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_ADDRCONFIG;
+ hints.ai_family = (IPv6Enabled() ? AF_UNSPEC : AF_INET);
+
+ stringstream ss;
+ ss << port;
+ int ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs);
+ if (ret){
+ log() << "getaddrinfo(\"" << iporhost << "\") failed: " << gai_strerror(ret) << endl;
+ *this = SockAddr(port);
+ }else{
+ //TODO: handle other addresses in linked list;
+ assert(addrs->ai_addrlen <= sizeof(sa));
+ memcpy(&sa, addrs->ai_addr, addrs->ai_addrlen);
+ addressSize = addrs->ai_addrlen;
+ freeaddrinfo(addrs);
+ }
+ }
+ }
+
+ bool SockAddr::isLocalHost() const {
+ switch (getType()){
+ case AF_INET: return getAddr() == "127.0.0.1";
+ case AF_INET6: return getAddr() == "::1";
+ case AF_UNIX: return true;
+ default: return false;
+ }
+ assert(false);
+ return false;
+ }
+
+ string hostbyname(const char *hostname) {
+ string addr = SockAddr(hostname, 0).getAddr();
+ if (addr == "0.0.0.0")
+ return "";
+ else
+ return addr;
+ }
+
+ class UDPConnection {
+ public:
+ UDPConnection() {
+ sock = 0;
+ }
+ ~UDPConnection() {
+ if ( sock ) {
+ closesocket(sock);
+ sock = 0;
+ }
+ }
+ bool init(const SockAddr& myAddr);
+ int recvfrom(char *buf, int len, SockAddr& sender);
+ int sendto(char *buf, int len, const SockAddr& EndPoint);
+ int mtu(const SockAddr& sa) {
+ return sa.isLocalHost() ? 16384 : 1480;
+ }
+
+ SOCKET sock;
+ };
+
+ inline int UDPConnection::recvfrom(char *buf, int len, SockAddr& sender) {
+ return ::recvfrom(sock, buf, len, 0, sender.raw(), &sender.addressSize);
+ }
+
+ inline int UDPConnection::sendto(char *buf, int len, const SockAddr& EndPoint) {
+ if ( 0 && rand() < (RAND_MAX>>4) ) {
+ out() << " NOTSENT ";
+ return 0;
+ }
+ return ::sendto(sock, buf, len, 0, EndPoint.raw(), EndPoint.addressSize);
+ }
+
+ inline bool UDPConnection::init(const SockAddr& myAddr) {
+ sock = socket(myAddr.getType(), SOCK_DGRAM, IPPROTO_UDP);
+ if ( sock == INVALID_SOCKET ) {
+ out() << "invalid socket? " << errnoWithDescription() << endl;
+ return false;
+ }
+ if ( ::bind(sock, myAddr.raw(), myAddr.addressSize) != 0 ) {
+ out() << "udp init failed" << endl;
+ closesocket(sock);
+ sock = 0;
+ return false;
+ }
+ socklen_t optLen;
+ int rcvbuf;
+ if (getsockopt(sock,
+ SOL_SOCKET,
+ SO_RCVBUF,
+ (char*)&rcvbuf,
+ &optLen) != -1)
+ out() << "SO_RCVBUF:" << rcvbuf << endl;
+ return true;
}
void sendtest() {
@@ -50,7 +153,7 @@ namespace mongo {
if ( c.init(me) ) {
char buf[256];
out() << "sendto: ";
- out() << c.sendto(buf, sizeof(buf), dest) << " " << OUTPUT_ERRNO << endl;
+ out() << c.sendto(buf, sizeof(buf), dest) << " " << errnoWithDescription() << endl;
}
out() << "end\n";
}
@@ -63,147 +166,44 @@ namespace mongo {
if ( c.init(me) ) {
char buf[256];
out() << "recvfrom: ";
- out() << c.recvfrom(buf, sizeof(buf), sender) << " " << OUTPUT_ERRNO << endl;
+ out() << c.recvfrom(buf, sizeof(buf), sender) << " " << errnoWithDescription() << endl;
}
out() << "end listentest\n";
}
void xmain();
- struct SockStartupTests {
- SockStartupTests() {
+
#if defined(_WIN32)
- WSADATA d;
- if ( WSAStartup(MAKEWORD(2,2), &d) != 0 ) {
- out() << "ERROR: wsastartup failed " << OUTPUT_ERRNO << endl;
- problem() << "ERROR: wsastartup failed " << OUTPUT_ERRNO << endl;
- dbexit( EXIT_NTSERVICE_ERROR );
+ namespace {
+ struct WinsockInit {
+ WinsockInit() {
+ WSADATA d;
+ if ( WSAStartup(MAKEWORD(2,2), &d) != 0 ) {
+ out() << "ERROR: wsastartup failed " << errnoWithDescription() << endl;
+ problem() << "ERROR: wsastartup failed " << errnoWithDescription() << endl;
+ dbexit( EXIT_NTSERVICE_ERROR );
+ }
}
-#endif
- //out() << "ntohl:" << ntohl(256) << endl;
- //sendtest();
- //listentest();
- }
- } sstests;
-
-#if 0
- void smain() {
-
- WSADATA wsaData;
- SOCKET RecvSocket;
- sockaddr_in RecvAddr;
- int Port = 27015;
- char RecvBuf[1024];
- int BufLen = 1024;
- sockaddr_in SenderAddr;
- int SenderAddrSize = sizeof(SenderAddr);
-
- //-----------------------------------------------
- // Initialize Winsock
- WSAStartup(MAKEWORD(2,2), &wsaData);
-
- //-----------------------------------------------
- // Create a receiver socket to receive datagrams
- RecvSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- prebindOptions( RecvSocket );
-
- //-----------------------------------------------
- // Bind the socket to any address and the specified port.
- RecvAddr.sin_family = AF_INET;
- RecvAddr.sin_port = htons(Port);
- RecvAddr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- ::bind(RecvSocket, (SOCKADDR *) &RecvAddr, sizeof(RecvAddr));
-
- //-----------------------------------------------
- // Call the recvfrom function to receive datagrams
- // on the bound socket.
- printf("Receiving datagrams...\n");
- recvfrom(RecvSocket,
- RecvBuf,
- BufLen,
- 0,
- (SOCKADDR *)&SenderAddr,
- &SenderAddrSize);
-
- //-----------------------------------------------
- // Close the socket when finished receiving datagrams
- printf("Finished receiving. Closing socket.\n");
- closesocket(RecvSocket);
-
- //-----------------------------------------------
- // Clean up and exit.
- printf("Exiting.\n");
- WSACleanup();
- return;
+ } winsock_init;
}
-
-
-
-
- void xmain() {
-
- WSADATA wsaData;
- SOCKET RecvSocket;
- sockaddr_in RecvAddr;
- int Port = 27015;
- char RecvBuf[1024];
- int BufLen = 1024;
- sockaddr_in SenderAddr;
- int SenderAddrSize = sizeof(SenderAddr);
-
- //-----------------------------------------------
- // Initialize Winsock
- WSAStartup(MAKEWORD(2,2), &wsaData);
-
- //-----------------------------------------------
- // Create a receiver socket to receive datagrams
-
- RecvSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- prebindOptions( RecvSocket );
-
- //-----------------------------------------------
- // Bind the socket to any address and the specified port.
- RecvAddr.sin_family = AF_INET;
- RecvAddr.sin_port = htons(Port);
- RecvAddr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- SockAddr a(Port);
- ::bind(RecvSocket, (SOCKADDR *) &a.sa, a.addressSize);
-// bind(RecvSocket, (SOCKADDR *) &RecvAddr, sizeof(RecvAddr));
-
- SockAddr b;
-
- //-----------------------------------------------
- // Call the recvfrom function to receive datagrams
- // on the bound socket.
- printf("Receiving datagrams...\n");
- recvfrom(RecvSocket,
- RecvBuf,
- BufLen,
- 0,
- (SOCKADDR *) &b.sa, &b.addressSize);
-// (SOCKADDR *)&SenderAddr,
-// &SenderAddrSize);
-
- //-----------------------------------------------
- // Close the socket when finished receiving datagrams
- printf("Finished receiving. Closing socket.\n");
- closesocket(RecvSocket);
-
- //-----------------------------------------------
- // Clean up and exit.
- printf("Exiting.\n");
- WSACleanup();
- return;
- }
-
#endif
+ SockAddr unknownAddress( "0.0.0.0", 0 );
+
ListeningSockets* ListeningSockets::_instance = new ListeningSockets();
ListeningSockets* ListeningSockets::get(){
return _instance;
}
+
+ string getHostNameCached(){
+ static string host;
+ if ( host.empty() ){
+ string s = getHostName();
+ host = s;
+ }
+ return host;
+ }
} // namespace mongo
diff --git a/util/sock.h b/util/sock.h
index ee7a7ae..300c24d 100644
--- a/util/sock.h
+++ b/util/sock.h
@@ -17,32 +17,43 @@
#pragma once
-#include "../stdafx.h"
+#include "../pch.h"
#include <stdio.h>
#include <sstream>
#include "goodies.h"
-
-#ifdef _WIN32
-#include <windows.h>
-#include <winsock.h>
-#endif
+#include "../db/jsobj.h"
namespace mongo {
+ const int SOCK_FAMILY_UNKNOWN_ERROR=13078;
+
#if defined(_WIN32)
+ typedef short sa_family_t;
typedef int socklen_t;
inline int getLastError() {
return WSAGetLastError();
}
+ inline const char* gai_strerror(int code) {
+ return ::gai_strerrorA(code);
+ }
inline void disableNagle(int sock) {
int x = 1;
if ( setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &x, sizeof(x)) )
out() << "ERROR: disableNagle failed" << endl;
+ if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) )
+ out() << "ERROR: SO_KEEPALIVE failed" << endl;
}
inline void prebindOptions( int sock ) {
}
+
+ // This won't actually be used on windows
+ struct sockaddr_un {
+ short sun_family;
+ char sun_path[108]; // length from unix header
+ };
+
#else
} // namespace mongo
@@ -50,11 +61,19 @@ namespace mongo {
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <errno.h>
#include <netdb.h>
+#ifdef __openbsd__
+# include <sys/uio.h>
+#endif
+
+#ifndef AI_ADDRCONFIG
+# define AI_ADDRCONFIG 0
+#endif
namespace mongo {
@@ -74,7 +93,12 @@ namespace mongo {
#endif
if ( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) )
- log() << "ERROR: disableNagle failed" << endl;
+ log() << "ERROR: disableNagle failed: " << errnoWithDescription() << endl;
+
+#ifdef SO_KEEPALIVE
+ if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) )
+ log() << "ERROR: SO_KEEPALIVE failed: " << errnoWithDescription() << endl;
+#endif
}
inline void prebindOptions( int sock ) {
@@ -87,161 +111,149 @@ namespace mongo {
#endif
- inline void setSockReceiveTimeout(int sock, int secs) {
-// todo - finish - works?
+ inline string makeUnixSockPath(int port){
+ return "/tmp/mongodb-" + BSONObjBuilder::numStr(port) + ".sock";
+ }
+
+ inline void setSockTimeouts(int sock, int secs) {
struct timeval tv;
- tv.tv_sec = 0;//secs;
- tv.tv_usec = 1000;
- int rc = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv));
- if ( rc ) {
- out() << "ERROR: setsockopt RCVTIMEO failed rc:" << rc << " " << OUTPUT_ERRNO << " secs:" << secs << " sock:" << sock << endl;
- }
+ tv.tv_sec = secs;
+ tv.tv_usec = 0;
+ bool report = logLevel > 3; // solaris doesn't provide these
+ DEV report = true;
+ bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv) ) == 0;
+ if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl;
+ ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof(tv) ) == 0;
+ DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl;
}
// If an ip address is passed in, just return that. If a hostname is passed
// in, look up its ip and return that. Returns "" on failure.
string hostbyname(const char *hostname);
+ void enableIPv6(bool state=true);
+ bool IPv6Enabled();
+
struct SockAddr {
SockAddr() {
- addressSize = sizeof(sockaddr_in);
+ addressSize = sizeof(sa);
memset(&sa, 0, sizeof(sa));
+ sa.ss_family = AF_UNSPEC;
}
SockAddr(int sourcePort); /* listener side */
SockAddr(const char *ip, int port); /* EndPoint (remote) side, or if you want to specify which interface locally */
- struct sockaddr_in sa;
- socklen_t addressSize;
+ template <typename T>
+ T& as() { return *(T*)(&sa); }
+ template <typename T>
+ const T& as() const { return *(const T*)(&sa); }
- bool isLocalHost() const {
-#if defined(_WIN32)
- return sa.sin_addr.S_un.S_addr == 0x100007f;
-#else
- return sa.sin_addr.s_addr == 0x100007f;
-#endif
+ string toString(bool includePort=true) const{
+ string out = getAddr();
+ if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC)
+ out += ':' + BSONObjBuilder::numStr(getPort());
+ return out;
}
- string toString() const{
- stringstream out;
- out << inet_ntoa(sa.sin_addr) << ':'
- << ntohs(sa.sin_port);
- return out.str();
+ // returns one of AF_INET, AF_INET6, or AF_UNIX
+ sa_family_t getType() const {
+ return sa.ss_family;
}
- operator string() const{
- return toString();
+ unsigned getPort() const {
+ switch (getType()){
+ case AF_INET: return ntohs(as<sockaddr_in>().sin_port);
+ case AF_INET6: return ntohs(as<sockaddr_in6>().sin6_port);
+ case AF_UNIX: return 0;
+ case AF_UNSPEC: return 0;
+ default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return 0;
+ }
}
- unsigned getPort() {
- return sa.sin_port;
+ string getAddr() const {
+ switch (getType()){
+ case AF_INET:
+ case AF_INET6: {
+ const int buflen=128;
+ char buffer[buflen];
+ int ret = getnameinfo(raw(), addressSize, buffer, buflen, NULL, 0, NI_NUMERICHOST);
+ massert(13082, gai_strerror(ret), ret == 0);
+ return buffer;
+ }
+
+ case AF_UNIX: return (addressSize > 2 ? as<sockaddr_un>().sun_path : "anonymous unix socket");
+ case AF_UNSPEC: return "(NONE)";
+ default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return "";
+ }
}
- bool localhost() const { return inet_addr( "127.0.0.1" ) == sa.sin_addr.s_addr; }
+ bool isLocalHost() const;
bool operator==(const SockAddr& r) const {
- return sa.sin_addr.s_addr == r.sa.sin_addr.s_addr &&
- sa.sin_port == r.sa.sin_port;
+ if (getType() != r.getType())
+ return false;
+
+ if (getPort() != r.getPort())
+ return false;
+
+ switch (getType()){
+ case AF_INET: return as<sockaddr_in>().sin_addr.s_addr == r.as<sockaddr_in>().sin_addr.s_addr;
+ case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) == 0;
+ case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) == 0;
+ case AF_UNSPEC: return true; // assume all unspecified addresses are the same
+ default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false);
+ }
}
bool operator!=(const SockAddr& r) const {
return !(*this == r);
}
bool operator<(const SockAddr& r) const {
- if ( sa.sin_port >= r.sa.sin_port )
+ if (getType() < r.getType())
+ return true;
+ else if (getType() > r.getType())
return false;
- return sa.sin_addr.s_addr < r.sa.sin_addr.s_addr;
- }
- };
- const int MaxMTU = 16384;
+ if (getPort() < r.getPort())
+ return true;
+ else if (getPort() > r.getPort())
+ return false;
- class UDPConnection {
- public:
- UDPConnection() {
- sock = 0;
- }
- ~UDPConnection() {
- if ( sock ) {
- closesocket(sock);
- sock = 0;
+ switch (getType()){
+ case AF_INET: return as<sockaddr_in>().sin_addr.s_addr < r.as<sockaddr_in>().sin_addr.s_addr;
+ case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) < 0;
+ case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) < 0;
+ case AF_UNSPEC: return false;
+ default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false);
}
}
- bool init(const SockAddr& myAddr);
- int recvfrom(char *buf, int len, SockAddr& sender);
- int sendto(char *buf, int len, const SockAddr& EndPoint);
- int mtu(const SockAddr& sa) {
- return sa.isLocalHost() ? 16384 : 1480;
- }
-
- SOCKET sock;
- };
-
- inline int UDPConnection::recvfrom(char *buf, int len, SockAddr& sender) {
- return ::recvfrom(sock, buf, len, 0, (sockaddr *) &sender.sa, &sender.addressSize);
- }
- inline int UDPConnection::sendto(char *buf, int len, const SockAddr& EndPoint) {
- if ( 0 && rand() < (RAND_MAX>>4) ) {
- out() << " NOTSENT ";
- // out() << curTimeMillis() << " .TEST: NOT SENDING PACKET" << endl;
- return 0;
- }
- return ::sendto(sock, buf, len, 0, (sockaddr *) &EndPoint.sa, EndPoint.addressSize);
- }
+ const sockaddr* raw() const {return (sockaddr*)&sa;}
+ sockaddr* raw() {return (sockaddr*)&sa;}
- inline bool UDPConnection::init(const SockAddr& myAddr) {
- sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- if ( sock == INVALID_SOCKET ) {
- out() << "invalid socket? " << OUTPUT_ERRNO << endl;
- return false;
- }
- //out() << sizeof(sockaddr_in) << ' ' << myAddr.addressSize << endl;
- if ( ::bind(sock, (sockaddr *) &myAddr.sa, myAddr.addressSize) != 0 ) {
- out() << "udp init failed" << endl;
- closesocket(sock);
- sock = 0;
- return false;
- }
- socklen_t optLen;
- int rcvbuf;
- if (getsockopt(sock,
- SOL_SOCKET,
- SO_RCVBUF,
- (char*)&rcvbuf,
- &optLen) != -1)
- out() << "SO_RCVBUF:" << rcvbuf << endl;
- return true;
- }
+ socklen_t addressSize;
+ private:
+ struct sockaddr_storage sa;
+ };
- inline SockAddr::SockAddr(int sourcePort) {
- memset(sa.sin_zero, 0, sizeof(sa.sin_zero));
- sa.sin_family = AF_INET;
- sa.sin_port = htons(sourcePort);
- sa.sin_addr.s_addr = htonl(INADDR_ANY);
- addressSize = sizeof(sa);
- }
+ extern SockAddr unknownAddress; // ( "0.0.0.0", 0 )
- inline SockAddr::SockAddr(const char * iporhost , int port) {
- string ip = hostbyname( iporhost );
- memset(sa.sin_zero, 0, sizeof(sa.sin_zero));
- sa.sin_family = AF_INET;
- sa.sin_port = htons(port);
- sa.sin_addr.s_addr = inet_addr(ip.c_str());
- addressSize = sizeof(sa);
- }
+ const int MaxMTU = 16384;
inline string getHostName() {
char buf[256];
int ec = gethostname(buf, 127);
if ( ec || *buf == 0 ) {
- log() << "can't get this server's hostname " << OUTPUT_ERRNO << endl;
+ log() << "can't get this server's hostname " << errnoWithDescription() << endl;
return "";
}
return buf;
}
+ string getHostNameCached();
+
class ListeningSockets {
public:
- ListeningSockets() : _sockets( new set<int>() ){
+ ListeningSockets() : _mutex("ListeningSockets"), _sockets( new set<int>() ){
}
void add( int sock ){
diff --git a/util/stringutils.cpp b/util/stringutils.cpp
new file mode 100644
index 0000000..3f989fd
--- /dev/null
+++ b/util/stringutils.cpp
@@ -0,0 +1,44 @@
+// stringutils.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+
+namespace mongo {
+
+ void splitStringDelim( const string& str , vector<string>* res , char delim ){
+ if ( str.empty() )
+ return;
+
+ size_t beg = 0;
+ size_t pos = str.find( delim );
+ while ( pos != string::npos ){
+ res->push_back( str.substr( beg, pos - beg) );
+ beg = ++pos;
+ pos = str.find( delim, beg );
+ }
+ res->push_back( str.substr( beg ) );
+ }
+
+ void joinStringDelim( const vector<string>& strs , string* res , char delim ){
+ for ( vector<string>::const_iterator it = strs.begin(); it != strs.end(); ++it ){
+ if ( it !=strs.begin() ) res->push_back( delim );
+ res->append( *it );
+ }
+ }
+
+} // namespace mongo
diff --git a/util/stringutils.h b/util/stringutils.h
new file mode 100644
index 0000000..6b79c33
--- /dev/null
+++ b/util/stringutils.h
@@ -0,0 +1,43 @@
+// stringutils.h
+
+/* Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef UTIL_STRING_UTILS_HEADER
+#define UTIL_STRING_UTILS_HEADER
+
+namespace mongo {
+
+ void splitStringDelim( const string& str , vector<string>* res , char delim );
+
+ void joinStringDelim( const vector<string>& strs , string* res , char delim );
+
+ inline string tolowerString( const string& input ){
+ string::size_type sz = input.size();
+
+ boost::scoped_array<char> line(new char[sz+1]);
+ char * copy = line.get();
+
+ for ( string::size_type i=0; i<sz; i++ ){
+ char c = input[i];
+ copy[i] = (char)tolower( (int)c );
+ }
+ copy[sz] = 0;
+ return string(copy);
+ }
+
+} // namespace mongo
+
+#endif // UTIL_STRING_UTILS_HEADER
diff --git a/util/text.cpp b/util/text.cpp
new file mode 100644
index 0000000..f381e01
--- /dev/null
+++ b/util/text.cpp
@@ -0,0 +1,117 @@
+// text.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "text.h"
+#include "unittest.h"
+
+namespace mongo{
+
+ inline int leadingOnes(unsigned char c){
+ if (c < 0x80) return 0;
+ static const char _leadingOnes[128] = {
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x80 - 0x8F
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x90 - 0x99
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0xA0 - 0xA9
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0xB0 - 0xB9
+ 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, // 0xC0 - 0xC9
+ 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, // 0xD0 - 0xD9
+ 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, // 0xE0 - 0xE9
+ 4, 4, 4, 4, 4, 4, 4, 4, // 0xF0 - 0xF7
+ 5, 5, 5, 5, // 0xF8 - 0xFB
+ 6, 6, // 0xFC - 0xFD
+ 7, // 0xFE
+ 8, // 0xFF
+ };
+ return _leadingOnes[c & 0x7f];
+
+ }
+
+ bool isValidUTF8(const char *s){
+ int left = 0; // how many bytes are left in the current codepoint
+ while (*s){
+ const unsigned char c = (unsigned char) *(s++);
+ const int ones = leadingOnes(c);
+ if (left){
+ if (ones != 1) return false; // should be a continuation byte
+ left--;
+ }else{
+ if (ones == 0) continue; // ASCII byte
+ if (ones == 1) return false; // unexpected continuation byte
+ if (c > 0xF4) return false; // codepoint too large (< 0x10FFFF)
+ if (c == 0xC0 || c == 0xC1) return false; // codepoints <= 0x7F shouldn't be 2 bytes
+
+ // still valid
+ left = ones-1;
+ }
+ }
+ if (left!=0) return false; // string ended mid-codepoint
+ return true;
+ }
+
+ #if defined(_WIN32)
+
+ std::string toUtf8String(const std::wstring& wide)
+ {
+ if (wide.size() > boost::integer_traits<int>::const_max)
+ throw std::length_error(
+ "Wide string cannot be more than INT_MAX characters long.");
+ if (wide.size() == 0)
+ return "";
+
+ // Calculate necessary buffer size
+ int len = ::WideCharToMultiByte(
+ CP_UTF8, 0, wide.c_str(), static_cast<int>(wide.size()),
+ NULL, 0, NULL, NULL);
+
+ // Perform actual conversion
+ if (len > 0)
+ {
+ std::vector<char> buffer(len);
+ len = ::WideCharToMultiByte(
+ CP_UTF8, 0, wide.c_str(), static_cast<int>(wide.size()),
+ &buffer[0], static_cast<int>(buffer.size()), NULL, NULL);
+ if (len > 0)
+ {
+ assert(len == static_cast<int>(buffer.size()));
+ return std::string(&buffer[0], buffer.size());
+ }
+ }
+
+ throw boost::system::system_error(
+ ::GetLastError(), boost::system::system_category);
+ }
+
+#if defined(_UNICODE)
+ std::wstring toWideString(const char *s) {
+ std::basic_ostringstream<TCHAR> buf;
+ buf << s;
+ return buf.str();
+ }
+#endif
+
+ #endif
+
+ struct TextUnitTest : public UnitTest {
+ void run() {
+ assert( parseLL("123") == 123 );
+ assert( parseLL("-123000000000") == -123000000000LL );
+ }
+ } textUnitTest;
+
+}
+
diff --git a/util/text.h b/util/text.h
new file mode 100644
index 0000000..4ba622f
--- /dev/null
+++ b/util/text.h
@@ -0,0 +1,142 @@
+// text.h
+/*
+ * Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace mongo {
+
+ class StringSplitter {
+ public:
+ StringSplitter( const char * big , const char * splitter )
+ : _big( big ) , _splitter( splitter ){
+ }
+
+ bool more(){
+ return _big[0];
+ }
+
+ string next(){
+ const char * foo = strstr( _big , _splitter );
+ if ( foo ){
+ string s( _big , foo - _big );
+ _big = foo + 1;
+ while ( *_big && strstr( _big , _splitter ) == _big )
+ _big++;
+ return s;
+ }
+
+ string s = _big;
+ _big += strlen( _big );
+ return s;
+ }
+
+ void split( vector<string>& l ){
+ while ( more() ){
+ l.push_back( next() );
+ }
+ }
+
+ vector<string> split(){
+ vector<string> l;
+ split( l );
+ return l;
+ }
+
+ static vector<string> split( const string& big , const string& splitter ){
+ StringSplitter ss( big.c_str() , splitter.c_str() );
+ return ss.split();
+ }
+
+ static string join( vector<string>& l , const string& split ){
+ stringstream ss;
+ for ( unsigned i=0; i<l.size(); i++ ){
+ if ( i > 0 )
+ ss << split;
+ ss << l[i];
+ }
+ return ss.str();
+ }
+
+ private:
+ const char * _big;
+ const char * _splitter;
+ };
+
+ /* This doesn't defend against ALL bad UTF8, but it will guarantee that the
+ * string can be converted to sequence of codepoints. However, it doesn't
+ * guarantee that the codepoints are valid.
+ */
+ bool isValidUTF8(const char *s);
+ inline bool isValidUTF8(string s) { return isValidUTF8(s.c_str()); }
+
+#if defined(_WIN32)
+
+ std::string toUtf8String(const std::wstring& wide);
+
+ std::wstring toWideString(const char *s);
+
+ /* like toWideString but UNICODE macro sensitive */
+# if !defined(_UNICODE)
+#error temp error
+ inline std::string toNativeString(const char *s) { return s; }
+# else
+ inline std::wstring toNativeString(const char *s) { return toWideString(s); }
+# endif
+
+#endif
+
+ // expect that n contains a base ten number and nothing else after it
+ // NOTE win version hasn't been tested directly
+ inline long long parseLL( const char *n ) {
+ long long ret;
+ uassert( 13307, "cannot convert empty string to long long", *n != 0 );
+#if !defined(_WIN32)
+ char *endPtr = 0;
+ errno = 0;
+ ret = strtoll( n, &endPtr, 10 );
+ uassert( 13305, "could not convert string to long long", *endPtr == 0 && errno == 0 );
+#elif _MSC_VER>=1600 // 1600 is VS2k10 1500 is VS2k8
+ size_t endLen = 0;
+ try {
+ ret = stoll( n, &endLen, 10 );
+ } catch ( ... ) {
+ endLen = 0;
+ }
+ uassert( 13306, "could not convert string to long long", endLen != 0 && n[ endLen ] == 0 );
+#else // stoll() wasn't introduced until VS 2010.
+ char* endPtr = 0;
+ ret = _strtoi64( n, &endPtr, 10 );
+ uassert( 13310, "could not convert string to long long", (*endPtr == 0) && (ret != _I64_MAX) && (ret != _I64_MIN) );
+#endif // !defined(_WIN32)
+ return ret;
+ }
+}
diff --git a/util/thread_pool.cpp b/util/thread_pool.cpp
deleted file mode 100644
index 77d0d05..0000000
--- a/util/thread_pool.cpp
+++ /dev/null
@@ -1,139 +0,0 @@
-/* threadpool.cpp
-*/
-
-/* Copyright 2009 10gen Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "stdafx.h"
-#include "thread_pool.h"
-#include "mvar.h"
-
-
-namespace mongo{
-namespace threadpool{
-
-// Worker thread
-class Worker : boost::noncopyable {
-public:
- explicit Worker(ThreadPool& owner)
- : _owner(owner)
- , _is_done(true)
- , _thread(boost::bind(&Worker::loop, this))
- {}
-
- // destructor will block until current operation is completed
- // Acts as a "join" on this thread
- ~Worker(){
- _task.put(Task());
- _thread.join();
- }
-
- void set_task(Task& func){
- assert(!func.empty());
- assert(_is_done);
- _is_done = false;
-
- _task.put(func);
- }
-
- private:
- ThreadPool& _owner;
- MVar<Task> _task;
- bool _is_done; // only used for error detection
- boost::thread _thread;
-
- void loop(){
- while (true) {
- Task task = _task.take();
- if (task.empty())
- break; // ends the thread
-
- try {
- task();
- } catch (std::exception e){
- log() << "Unhandled exception in worker thread: " << e.what() << endl;;
- } catch (...){
- log() << "Unhandled non-exception in worker thread" << endl;
- }
- _is_done = true;
- _owner.task_done(this);
- }
- }
-};
-
-ThreadPool::ThreadPool(int nThreads)
- : _tasksRemaining(0)
- , _nThreads(nThreads)
-{
- scoped_lock lock(_mutex);
- while (nThreads-- > 0){
- Worker* worker = new Worker(*this);
- _freeWorkers.push_front(worker);
- }
-}
-
-ThreadPool::~ThreadPool(){
- join();
-
- assert(_tasks.empty());
-
- // O(n) but n should be small
- assert(_freeWorkers.size() == (unsigned)_nThreads);
-
- while(!_freeWorkers.empty()){
- delete _freeWorkers.front();
- _freeWorkers.pop_front();
- }
-}
-
-void ThreadPool::join(){
- scoped_lock lock(_mutex);
- while(_tasksRemaining){
- _condition.wait(lock.boost());
- }
-}
-
-void ThreadPool::schedule(Task task){
- scoped_lock lock(_mutex);
-
- _tasksRemaining++;
-
- if (!_freeWorkers.empty()){
- _freeWorkers.front()->set_task(task);
- _freeWorkers.pop_front();
- }else{
- _tasks.push_back(task);
- }
-}
-
-// should only be called by a worker from the worker thread
-void ThreadPool::task_done(Worker* worker){
- scoped_lock lock(_mutex);
-
- if (!_tasks.empty()){
- worker->set_task(_tasks.front());
- _tasks.pop_front();
- }else{
- _freeWorkers.push_front(worker);
- }
-
- _tasksRemaining--;
-
- if(_tasksRemaining == 0)
- _condition.notify_all();
-}
-
-} //namespace threadpool
-} //namespace mongo
diff --git a/util/util.cpp b/util/util.cpp
index 8ae00f3..02abfa9 100644
--- a/util/util.cpp
+++ b/util/util.cpp
@@ -1,4 +1,4 @@
-// util.cpp
+// @file util.cpp
/* Copyright 2009 10gen Inc.
*
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "goodies.h"
#include "unittest.h"
#include "file_allocator.h"
@@ -23,18 +23,71 @@
namespace mongo {
- vector<UnitTest*> *UnitTest::tests = 0;
- bool UnitTest::running = false;
+ boost::thread_specific_ptr<string> _threadName;
+
+ void _setThreadName( const char * name ){
+ static int N = 0;
+ if ( strcmp( name , "conn" ) == 0 ){
+ stringstream ss;
+ ss << name << ++N;
+ _threadName.reset( new string( ss.str() ) );
+ }
+ else {
+ _threadName.reset( new string(name) );
+ }
+ }
+
+#if defined(_WIN32)
+#define MS_VC_EXCEPTION 0x406D1388
+#pragma pack(push,8)
+ typedef struct tagTHREADNAME_INFO
+ {
+ DWORD dwType; // Must be 0x1000.
+ LPCSTR szName; // Pointer to name (in user addr space).
+ DWORD dwThreadID; // Thread ID (-1=caller thread).
+ DWORD dwFlags; // Reserved for future use, must be zero.
+ } THREADNAME_INFO;
+#pragma pack(pop)
+
+ void setThreadName(const char *name)
+ {
+ _setThreadName( name );
+ Sleep(10);
+ THREADNAME_INFO info;
+ info.dwType = 0x1000;
+ info.szName = name;
+ info.dwThreadID = -1;
+ info.dwFlags = 0;
+ __try
+ {
+ RaiseException( MS_VC_EXCEPTION, 0, sizeof(info)/sizeof(ULONG_PTR), (ULONG_PTR*)&info );
+ }
+ __except(EXCEPTION_EXECUTE_HANDLER)
+ {
+ }
+ }
+#else
+ void setThreadName(const char * name ) {
+ _setThreadName( name );
+ }
+#endif
- Nullstream nullstream;
+ string getThreadName(){
+ string * s = _threadName.get();
+ if ( s )
+ return *s;
+ return "";
+ }
- thread_specific_ptr<Logstream> Logstream::tsp;
+ vector<UnitTest*> *UnitTest::tests = 0;
+ bool UnitTest::running = false;
const char *default_getcurns() { return ""; }
const char * (*getcurns)() = default_getcurns;
int logLevel = 0;
- mongo::mutex Logstream::mutex;
+ int tlogLevel = 0;
+ mongo::mutex Logstream::mutex("Logstream");
int Logstream::doneSetup = Logstream::magicNumber();
bool goingAway = false;
@@ -106,35 +159,21 @@ namespace mongo {
void rawOut( const string &s ) {
if( s.empty() ) return;
- char now[64];
- time_t_to_String(time(0), now);
- now[20] = 0;
-#if defined(_WIN32)
- (std::cout << now << " " << s).flush();
-#else
- write( STDOUT_FILENO, now, 20 );
- write( STDOUT_FILENO, " ", 1 );
- write( STDOUT_FILENO, s.c_str(), s.length() );
- fsync( STDOUT_FILENO );
-#endif
- }
-#ifndef _SCONS
- // only works in scons
- const char * gitVersion(){ return ""; }
- const char * sysInfo(){ return ""; }
-#endif
+ boost::scoped_array<char> buf_holder(new char[32 + s.size()]);
+ char * buf = buf_holder.get();
+
+ time_t_to_String( time(0) , buf );
+ buf[20] = ' ';
+ strncpy( buf + 21 , s.c_str() , s.size() );
+ buf[21+s.size()] = '\n';
+ buf[21+s.size()+1] = 0;
- void printGitVersion() { log() << "git version: " << gitVersion() << endl; }
- void printSysInfo() { log() << "sys info: " << sysInfo() << endl; }
- string mongodVersion() {
- stringstream ss;
- ss << "db version v" << versionString << ", pdfile version " << VERSION << "." << VERSION_MINOR;
- return ss.str();
+ Logstream::logLockless( buf );
}
ostream& operator<<( ostream &s, const ThreadSafeString &o ){
- s << (string)o;
+ s << o.toString();
return s;
}
diff --git a/util/version.cpp b/util/version.cpp
new file mode 100644
index 0000000..c49c064
--- /dev/null
+++ b/util/version.cpp
@@ -0,0 +1,86 @@
+#include "pch.h"
+
+#include <cstdlib>
+#include <iostream>
+#include <iomanip>
+#include <sstream>
+#include <string>
+
+#include "version.h"
+
+namespace mongo {
+
+ //
+ // mongo processes version support
+ //
+
+ const char versionString[] = "1.6.0";
+
+ string mongodVersion() {
+ stringstream ss;
+ ss << "db version v" << versionString << ", pdfile version " << VERSION << "." << VERSION_MINOR;
+ return ss.str();
+ }
+
+ //
+ // git version support
+ //
+
+#ifndef _SCONS
+ // only works in scons
+ const char * gitVersion(){ return "not-scons"; }
+#endif
+
+ void printGitVersion() { log() << "git version: " << gitVersion() << endl; }
+
+ //
+ // sys info support
+ //
+
+#ifndef _SCONS
+#if defined(_WIN32)
+ string sysInfo(){
+ stringstream ss;
+ ss << "not-scons win";
+ ss << " mscver:" << _MSC_FULL_VER << " built:" << __DATE__;
+ ss << " boostver:" << BOOST_VERSION;
+#if( !defined(_MT) )
+#error _MT is not defined
+#endif
+ ss << (sizeof(char *) == 8) ? " 64bit" : " 32bit";
+ return ss.str();
+ }
+#else
+ string sysInfo(){ return ""; }
+#endif
+#endif
+
+ void printSysInfo() { log() << "sys info: " << sysInfo() << endl; }
+
+ //
+ // 32 bit systems warning
+ //
+
+ void show_32_warning(){
+ bool warned = false;
+ {
+ const char * foo = strchr( versionString , '.' ) + 1;
+ int bar = atoi( foo );
+ if ( ( 2 * ( bar / 2 ) ) != bar ) {
+ cout << "\n** NOTE: This is a development version (" << versionString << ") of MongoDB.";
+ cout << "\n** Not recommended for production. \n" << endl;
+ warned = true;
+ }
+ }
+
+ if ( sizeof(int*) != 4 )
+ return;
+
+ if( !warned ) // prettier this way
+ cout << endl;
+ cout << "** NOTE: when using MongoDB 32 bit, you are limited to about 2 gigabytes of data" << endl;
+ cout << "** see http://blog.mongodb.org/post/137788967/32-bit-limitations" << endl;
+ cout << endl;
+ }
+
+}
diff --git a/util/version.h b/util/version.h
new file mode 100644
index 0000000..70ddeb8
--- /dev/null
+++ b/util/version.h
@@ -0,0 +1,24 @@
+#ifndef UTIL_VERSION_HEADER
+#define UTIL_VERSION_HEADER
+
+#include <string>
+
+namespace mongo {
+
+ using std::string;
+
+ // mongo version
+ extern const char versionString[];
+ string mongodVersion();
+
+ const char * gitVersion();
+ void printGitVersion();
+
+ string sysInfo();
+ void printSysInfo();
+
+ void show_32_warning();
+
+} // namespace mongo
+
+#endif // UTIL_VERSION_HEADER
diff --git a/util/winutil.h b/util/winutil.h
new file mode 100644
index 0000000..b69b69a
--- /dev/null
+++ b/util/winutil.h
@@ -0,0 +1,44 @@
+// @file winutil.cpp : Windows related utility functions
+//
+// /**
+// * Copyright (C) 2008 10gen Inc.
+// *
+// * This program is free software: you can redistribute it and/or modify
+// * it under the terms of the GNU Affero General Public License, version 3,
+// * as published by the Free Software Foundation.
+// *
+// * This program is distributed in the hope that it will be useful,
+// * but WITHOUT ANY WARRANTY; without even the implied warranty of
+// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// * GNU Affero General Public License for more details.
+// *
+// * You should have received a copy of the GNU Affero General Public License
+// * along with this program. If not, see <http://www.gnu.org/licenses/>.
+// */
+//
+// #include "pch.h"
+
+#pragma once
+
+#if defined(_WIN32)
+#include <windows.h>
+#include "text.h"
+
+namespace mongo {
+
+ inline string GetWinErrMsg(DWORD err) {
+ LPTSTR errMsg;
+ ::FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, (LPTSTR)&errMsg, 0, NULL );
+ std::string errMsgStr = toUtf8String( errMsg );
+ ::LocalFree( errMsg );
+ // FormatMessage() appends a newline to the end of error messages, we trim it because endl flushes the buffer.
+ errMsgStr = errMsgStr.erase( errMsgStr.length() - 2 );
+ std::ostringstream output;
+ output << errMsgStr << " (" << err << ")";
+
+ return output.str();
+ }
+}
+
+#endif
+