diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
commit | 7645618fd3914cb8a20561625913c20d49504a49 (patch) | |
tree | 8370f846f58f6d71165b7a0e2eda04648584ec76 /util | |
parent | 68c73c3c7608b4c87f07440dc3232801720b1168 (diff) | |
download | mongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz |
Imported Upstream version 1.6.0
Diffstat (limited to 'util')
85 files changed, 5672 insertions, 1680 deletions
diff --git a/util/allocator.h b/util/allocator.h index 90dbf24..2c07973 100644 --- a/util/allocator.h +++ b/util/allocator.h @@ -31,7 +31,9 @@ namespace mongo { return x; } -#define malloc mongo::ourmalloc -#define realloc mongo::ourrealloc +#define MONGO_malloc mongo::ourmalloc +#define malloc MONGO_malloc +#define MONGO_realloc mongo::ourrealloc +#define realloc MONGO_realloc } // namespace mongo diff --git a/util/array.h b/util/array.h index 827c00e..8da06fe 100644 --- a/util/array.h +++ b/util/array.h @@ -1,5 +1,21 @@ // array.h +/* + * Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + namespace mongo { template<typename T> @@ -70,7 +86,7 @@ namespace mongo { return _it->_data[_pos]; } - operator string() const { + string toString() const { stringstream ss; ss << _pos; return ss.str(); diff --git a/util/assert_util.cpp b/util/assert_util.cpp index b4659cc..faa18cb 100644 --- a/util/assert_util.cpp +++ b/util/assert_util.cpp @@ -15,10 +15,20 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "assert_util.h" #include "assert.h" #include "file.h" +#include <cmath> +using namespace std; + +#ifndef _WIN32 +#include <cxxabi.h> +#include <sys/file.h> +#endif + +//#include "../bson/bson.h" +#include "../db/jsobj.h" namespace mongo { @@ -41,6 +51,17 @@ namespace mongo { if ( newvalue >= max ) rollover(); } + + void ExceptionInfo::append( BSONObjBuilder& b , const char * m , const char * c ) const { + if ( msg.empty() ) + b.append( m , "unknown assertion" ); + else + b.append( m , msg ); + + if ( code ) + b.append( c , code ); + } + string getDbContext(); @@ -63,8 +84,7 @@ namespace mongo { lastAssert[0].set(msg, getDbContext().c_str(), file, line); stringstream temp; temp << "assertion " << file << ":" << line; - AssertionException e; - e.msg = temp.str(); + AssertionException e(temp.str(),0); breakpoint(); throw e; } @@ -74,13 +94,8 @@ namespace mongo { raiseError(0,msg); } - int uacount = 0; void uasserted(int msgid, const char *msg) { assertionCount.condrollover( ++assertionCount.user ); - if ( ++uacount < 100 ) - log() << "User Exception " << msgid << ":" << msg << endl; - else - RARELY log() << "User Exception " << msg << endl; lastAssert[3].set(msg, getDbContext().c_str(), "", 0); raiseError(msgid,msg); throw UserException(msgid, msg); @@ -88,11 +103,19 @@ namespace mongo { void msgasserted(int msgid, const char *msg) { assertionCount.condrollover( ++assertionCount.warning ); - log() << "Assertion: " << msgid << ":" << msg << endl; + tlog() << "Assertion: " << msgid << ":" << msg << endl; lastAssert[2].set(msg, getDbContext().c_str(), "", 0); raiseError(msgid,msg && *msg ? msg : "massert failure"); breakpoint(); - printStackTrace(); // TEMP?? should we get rid of this? TODO + printStackTrace(); + throw MsgAssertionException(msgid, msg); + } + + void msgassertedNoTrace(int msgid, const char *msg) { + assertionCount.condrollover( ++assertionCount.warning ); + log() << "Assertion: " << msgid << ":" << msg << endl; + lastAssert[2].set(msg, getDbContext().c_str(), "", 0); + raiseError(msgid,msg && *msg ? msg : "massert failure"); throw MsgAssertionException(msgid, msg); } @@ -100,12 +123,11 @@ namespace mongo { stringstream ss; // errno might not work on all systems for streams // if it doesn't for a system should deal with here - ss << msg << " stream invalie: " << OUTPUT_ERRNO; + ss << msg << " stream invalid: " << errnoWithDescription(); throw UserException( code , ss.str() ); } - - mongo::mutex *Assertion::_mutex = new mongo::mutex(); + mongo::mutex *Assertion::_mutex = new mongo::mutex("Assertion"); string Assertion::toString() { if( _mutex == 0 ) @@ -125,89 +147,31 @@ namespace mongo { return ss.str(); } - - class LoggingManager { - public: - LoggingManager() - : _enabled(0) , _file(0) { - } - - void start( const string& lp , bool append ){ - uassert( 10268 , "LoggingManager already started" , ! _enabled ); - _append = append; - - // test path - FILE * test = fopen( lp.c_str() , _append ? "a" : "w" ); - if ( ! test ){ - cout << "can't open [" << lp << "] for log file" << endl; - dbexit( EXIT_BADOPTIONS ); - assert( 0 ); - } - fclose( test ); - - _path = lp; - _enabled = 1; - rotate(); - } - - void rotate(){ - if ( ! _enabled ){ - cout << "LoggingManager not enabled" << endl; - return; - } + string errnoWithPrefix( const char * prefix ){ + stringstream ss; + if ( prefix ) + ss << prefix << ": "; + ss << errnoWithDescription(); + return ss.str(); + } + - if ( _file ){ + string demangleName( const type_info& typeinfo ){ #ifdef _WIN32 - cout << "log rotation doesn't work on windows" << endl; - return; + return typeinfo.name(); #else - struct tm t; - localtime_r( &_opened , &t ); - - stringstream ss; - ss << _path << "." << ( 1900 + t.tm_year ) << "-" << t.tm_mon << "-" << t.tm_mday - << "_" << t.tm_hour << "-" << t.tm_min << "-" << t.tm_sec; - string s = ss.str(); - rename( _path.c_str() , s.c_str() ); -#endif - } - - _file = freopen( _path.c_str() , _append ? "a" : "w" , stdout ); - if ( ! _file ){ - cerr << "can't open: " << _path.c_str() << " for log file" << endl; - dbexit( EXIT_BADOPTIONS ); - assert(0); - } - _opened = time(0); - } - - private: + int status; - bool _enabled; - string _path; - bool _append; + char * niceName = abi::__cxa_demangle(typeinfo.name(), 0, 0, &status); + if ( ! niceName ) + return typeinfo.name(); - FILE * _file; - time_t _opened; - - } loggingManager; - - void initLogging( const string& lp , bool append ){ - cout << "all output going to: " << lp << endl; - loggingManager.start( lp , append ); - } - - void rotateLogs( int signal ){ - loggingManager.rotate(); + string s = niceName; + free(niceName); + return s; +#endif } - string errnostring( const char * prefix ){ - stringstream ss; - if ( prefix ) - ss << prefix << ": "; - ss << OUTPUT_ERRNO; - return ss.str(); - } } diff --git a/util/assert_util.h b/util/assert_util.h index bae3a55..018dc43 100644 --- a/util/assert_util.h +++ b/util/assert_util.h @@ -22,6 +22,11 @@ namespace mongo { + enum CommonErrorCodes { + DatabaseDifferCaseCode = 13297 , + StaleConfigInContextCode = 13388 + }; + /* these are manipulated outside of mutexes, so be careful */ struct Assertion { Assertion() { @@ -81,68 +86,84 @@ namespace mongo { }; extern AssertionCount assertionCount; + + struct ExceptionInfo { + ExceptionInfo() : msg(""),code(-1){} + ExceptionInfo( const char * m , int c ) + : msg( m ) , code( c ){ + } + ExceptionInfo( const string& m , int c ) + : msg( m ) , code( c ){ + } + + void append( BSONObjBuilder& b , const char * m = "$err" , const char * c = "code" ) const ; + + string toString() const { stringstream ss; ss << "exception: " << code << " " << msg; return ss.str(); } + + bool empty() const { return msg.empty(); } + + + string msg; + int code; + }; class DBException : public std::exception { public: - virtual const char* what() const throw() = 0; + DBException( const ExceptionInfo& ei ) : _ei(ei){} + DBException( const char * msg , int code ) : _ei(msg,code){} + DBException( const string& msg , int code ) : _ei(msg,code){} + virtual ~DBException() throw() { } + + virtual const char* what() const throw(){ return _ei.msg.c_str(); } + virtual int getCode() const { return _ei.code; } + + virtual void appendPrefix( stringstream& ss ) const { } + virtual string toString() const { - return what(); + stringstream ss; ss << getCode() << " " << what(); return ss.str(); + return ss.str(); } - virtual int getCode() = 0; - operator string() const { return toString(); } + + const ExceptionInfo& getInfo() const { return _ei; } + + protected: + ExceptionInfo _ei; }; class AssertionException : public DBException { public: - int code; - string msg; - AssertionException() { code = 0; } + + AssertionException( const ExceptionInfo& ei ) : DBException(ei){} + AssertionException( const char * msg , int code ) : DBException(msg,code){} + AssertionException( const string& msg , int code ) : DBException(msg,code){} + virtual ~AssertionException() throw() { } - virtual bool severe() { - return true; - } - virtual bool isUserAssertion() { - return false; - } - virtual int getCode(){ return code; } - virtual const char* what() const throw() { return msg.c_str(); } + + virtual bool severe() { return true; } + virtual bool isUserAssertion() { return false; } /* true if an interrupted exception - see KillCurrentOp */ bool interrupted() { - return code == 11600 || code == 11601; + return _ei.code == 11600 || _ei.code == 11601; } }; - + /* UserExceptions are valid errors that a user can cause, like out of disk space or duplicate key */ class UserException : public AssertionException { public: - UserException(int c , const string& m) { - code = c; - msg = m; - } - virtual bool severe() { - return false; - } - virtual bool isUserAssertion() { - return true; - } - virtual string toString() const { - return "userassert:" + msg; - } - }; + UserException(int c , const string& m) : AssertionException( m , c ){} + virtual bool severe() { return false; } + virtual bool isUserAssertion() { return true; } + virtual void appendPrefix( stringstream& ss ) const { ss << "userassert:"; } + }; + class MsgAssertionException : public AssertionException { public: - MsgAssertionException(int c, const char *m) { - code = c; - msg = m; - } - virtual bool severe() { - return false; - } - virtual string toString() const { - return "massert:" + msg; - } + MsgAssertionException( const ExceptionInfo& ei ) : AssertionException( ei ){} + MsgAssertionException(int c, const string& m) : AssertionException( m , c ){} + virtual bool severe() { return false; } + virtual void appendPrefix( stringstream& ss ) const { ss << "massert:"; } }; void asserted(const char *msg, const char *file, unsigned line); @@ -150,6 +171,7 @@ namespace mongo { void uasserted(int msgid, const char *msg); inline void uasserted(int msgid , string msg) { uasserted(msgid, msg.c_str()); } void uassert_nothrow(const char *msg); // reported via lasterror, but don't throw exception + void msgassertedNoTrace(int msgid, const char *msg); void msgasserted(int msgid, const char *msg); inline void msgasserted(int msgid, string msg) { msgasserted(msgid, msg.c_str()); } @@ -157,59 +179,67 @@ namespace mongo { #undef assert #endif -#define assert(_Expression) (void)( (!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) ) +#define MONGO_assert(_Expression) (void)( (!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) ) +#define assert MONGO_assert /* "user assert". if asserts, user did something wrong, not our code */ -//#define uassert( 10269 , _Expression) (void)( (!!(_Expression)) || (uasserted(#_Expression, __FILE__, __LINE__), 0) ) -#define uassert(msgid, msg,_Expression) (void)( (!!(_Expression)) || (mongo::uasserted(msgid, msg), 0) ) - -#define xassert(_Expression) (void)( (!!(_Expression)) || (mongo::asserted(#_Expression, __FILE__, __LINE__), 0) ) - -#define yassert 1 +#define MONGO_uassert(msgid, msg, expr) (void)( (!!(expr)) || (mongo::uasserted(msgid, msg), 0) ) +#define uassert MONGO_uassert /* warning only - keeps going */ -#define wassert(_Expression) (void)( (!!(_Expression)) || (mongo::wasserted(#_Expression, __FILE__, __LINE__), 0) ) +#define MONGO_wassert(_Expression) (void)( (!!(_Expression)) || (mongo::wasserted(#_Expression, __FILE__, __LINE__), 0) ) +#define wassert MONGO_wassert /* display a message, no context, and throw assertionexception easy way to throw an exception and log something without our stack trace display happening. */ -#define massert(msgid, msg,_Expression) (void)( (!!(_Expression)) || (mongo::msgasserted(msgid, msg), 0) ) +#define MONGO_massert(msgid, msg, expr) (void)( (!!(expr)) || (mongo::msgasserted(msgid, msg), 0) ) +#define massert MONGO_massert /* dassert is 'debug assert' -- might want to turn off for production as these could be slow. */ #if defined(_DEBUG) -#define dassert assert +# define MONGO_dassert assert #else -#define dassert(x) +# define MONGO_dassert(x) #endif +#define dassert MONGO_dassert // some special ids that we want to duplicate // > 10000 asserts // < 10000 UserException -#define ASSERT_ID_DUPKEY 11000 + enum { ASSERT_ID_DUPKEY = 11000 }; + /* throws a uassertion with an appropriate msg */ void streamNotGood( int code , string msg , std::ios& myios ); -#define ASSERT_STREAM_GOOD(msgid,msg,stream) (void)( (!!((stream).good())) || (mongo::streamNotGood(msgid, msg, stream), 0) ) + inline void assertStreamGood(unsigned msgid, string msg, std::ios& myios) { + if( !myios.good() ) streamNotGood(msgid, msg, myios); + } + + string demangleName( const type_info& typeinfo ); } // namespace mongo -#define BOOST_CHECK_EXCEPTION( expression ) \ +#define BOOST_CHECK_EXCEPTION MONGO_BOOST_CHECK_EXCEPTION +#define MONGO_BOOST_CHECK_EXCEPTION( expression ) \ try { \ expression; \ } catch ( const std::exception &e ) { \ - problem() << "caught boost exception: " << e.what() << endl; \ - assert( false ); \ + stringstream ss; \ + ss << "caught boost exception: " << e.what(); \ + msgasserted( 13294 , ss.str() ); \ } catch ( ... ) { \ massert( 10437 , "unknown boost failed" , false ); \ } -#define DESTRUCTOR_GUARD( expression ) \ +#define DESTRUCTOR_GUARD MONGO_DESTRUCTOR_GUARD +#define MONGO_DESTRUCTOR_GUARD( expression ) \ try { \ expression; \ } catch ( const std::exception &e ) { \ diff --git a/util/atomic_int.h b/util/atomic_int.h deleted file mode 100644 index de50560..0000000 --- a/util/atomic_int.h +++ /dev/null @@ -1,100 +0,0 @@ -// atomic_int.h -// atomic wrapper for unsigned - -/* Copyright 2009 10gen Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#if defined(_WIN32) -# include <windows.h> -#endif - -namespace mongo{ - - - struct AtomicUInt{ - AtomicUInt() : x(0) {} - AtomicUInt(unsigned z) : x(z) { } - volatile unsigned x; - operator unsigned() const { - return x; - } - inline AtomicUInt operator++(); // ++prefix - inline AtomicUInt operator++(int);// postfix++ - inline AtomicUInt operator--(); // --prefix - inline AtomicUInt operator--(int); // postfix-- - }; - -#if defined(_WIN32) - AtomicUInt AtomicUInt::operator++(){ - // InterlockedIncrement returns the new value - return InterlockedIncrement((volatile long*)&x); //long is 32bits in Win64 - } - AtomicUInt AtomicUInt::operator++(int){ - return InterlockedIncrement((volatile long*)&x)-1; - } - AtomicUInt AtomicUInt::operator--(){ - return InterlockedDecrement((volatile long*)&x); - } - AtomicUInt AtomicUInt::operator--(int){ - return InterlockedDecrement((volatile long*)&x)+1; - } -#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - // this is in GCC >= 4.1 - AtomicUInt AtomicUInt::operator++(){ - return __sync_add_and_fetch(&x, 1); - } - AtomicUInt AtomicUInt::operator++(int){ - return __sync_fetch_and_add(&x, 1); - } - AtomicUInt AtomicUInt::operator--(){ - return __sync_add_and_fetch(&x, -1); - } - AtomicUInt AtomicUInt::operator--(int){ - return __sync_fetch_and_add(&x, -1); - } -#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__)) - // from boost 1.39 interprocess/detail/atomic.hpp - - inline unsigned atomic_int_helper(volatile unsigned *x, int val){ - int r; - asm volatile - ( - "lock\n\t" - "xadd %1, %0": - "+m"( *x ), "=r"( r ): // outputs (%0, %1) - "1"( val ): // inputs (%2 == %1) - "memory", "cc" // clobbers - ); - return r; - } - AtomicUInt AtomicUInt::operator++(){ - return atomic_int_helper(&x, 1)+1; - } - AtomicUInt AtomicUInt::operator++(int){ - return atomic_int_helper(&x, 1); - } - AtomicUInt AtomicUInt::operator--(){ - return atomic_int_helper(&x, -1)-1; - } - AtomicUInt AtomicUInt::operator--(int){ - return atomic_int_helper(&x, -1); - } -#else -# error "unsupported compiler or platform" -#endif - -} // namespace mongo diff --git a/util/background.cpp b/util/background.cpp index 4125315..a6d8290 100644 --- a/util/background.cpp +++ b/util/background.cpp @@ -15,14 +15,15 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "goodies.h" #include "background.h" +#include <list> namespace mongo { BackgroundJob *BackgroundJob::grab = 0; - mongo::mutex BackgroundJob::mutex; + mongo::mutex BackgroundJob::mutex("BackgroundJob"); /* static */ void BackgroundJob::thr() { @@ -31,9 +32,25 @@ namespace mongo { assert( us->state == NotStarted ); us->state = Running; grab = 0; - us->run(); + + { + string nm = us->name(); + setThreadName(nm.c_str()); + } + + try { + us->run(); + } + catch ( std::exception& e ){ + log( LL_ERROR ) << "backgroundjob error: " << e.what() << endl; + } + catch(...) { + log( LL_ERROR ) << "uncaught exception in BackgroundJob" << endl; + } us->state = Done; - if ( us->deleteSelf ) + bool delSelf = us->deleteSelf; + us->ending(); + if( delSelf ) delete us; } @@ -47,18 +64,56 @@ namespace mongo { return *this; } - bool BackgroundJob::wait(int msMax) { + bool BackgroundJob::wait(int msMax, unsigned maxsleep) { assert( state != NotStarted ); - int ms = 1; + unsigned ms = 0; Date_t start = jsTime(); while ( state != Done ) { sleepmillis(ms); - if ( ms < 1000 ) - ms = ms * 2; + if( ms*2<maxsleep ) ms*=2; if ( msMax && ( int( jsTime() - start ) > msMax) ) return false; } return true; } + void BackgroundJob::go(list<BackgroundJob*>& L) { + for( list<BackgroundJob*>::iterator i = L.begin(); i != L.end(); i++ ) + (*i)->go(); + } + + /* wait for several jobs to finish. */ + void BackgroundJob::wait(list<BackgroundJob*>& L, unsigned maxsleep) { + unsigned ms = 0; + { + x: + sleepmillis(ms); + if( ms*2<maxsleep ) ms*=2; + for( list<BackgroundJob*>::iterator i = L.begin(); i != L.end(); i++ ) { + assert( (*i)->state != NotStarted ); + if( (*i)->state != Done ) + goto x; + } + } + } + + void PeriodicBackgroundJob::run(){ + // want to handle first one differently so inShutdown is obeyed nicely + sleepmillis( _millis ); + + while ( ! inShutdown() ){ + try { + runLoop(); + } + catch ( std::exception& e ){ + log( LL_ERROR ) << "PeriodicBackgroundJob [" << name() << "] error: " << e.what() << endl; + } + catch ( ... ){ + log( LL_ERROR ) << "PeriodicBackgroundJob [" << name() << "] unknown error" << endl; + } + + sleepmillis( _millis ); + } + } + } // namespace mongo diff --git a/util/background.h b/util/background.h index c95a5bd..ee59455 100644 --- a/util/background.h +++ b/util/background.h @@ -19,31 +19,47 @@ namespace mongo { - /* object-orienty background thread dispatching. + /** object-orienty background thread dispatching. subclass and define run() - It is ok to call go() more than once -- if the previous invocation - has finished. Thus one pattern of use is to embed a backgroundjob - in your object and reuse it (or same thing with inheritance). + It is ok to call go(), that is, run the job, more than once -- if the + previous invocation has finished. Thus one pattern of use is to embed + a backgroundjob in your object and reuse it (or same thing with + inheritance). Each go() call spawns a new thread. + + note when job destructs, the thread is not terminated if still running. + generally if the thread could still be running, allocate the job dynamically + and set deleteSelf to true. + */ + /* example + class ConnectBG : public BackgroundJob { + public: + int sock; + int res; + SockAddr farEnd; + void run() { + res = ::connect(sock, farEnd.raw(), farEnd.addressSize); + } + }; */ - class BackgroundJob { + class BackgroundJob : boost::noncopyable { protected: - /* define this to do your work! */ + /** define this to do your work. + after this returns, state is set to done. + after this returns, deleted if deleteSelf true. + */ virtual void run() = 0; - + virtual string name() = 0; + virtual void ending() { } // hook for post processing if desired after everything else done. not called when deleteSelf=true public: enum State { NotStarted, Running, Done }; - State getState() const { - return state; - } - bool running() const { - return state == Running; - } + State getState() const { return state; } + bool running() const { return state == Running; } bool deleteSelf; // delete self when Done? @@ -53,14 +69,20 @@ namespace mongo { } virtual ~BackgroundJob() { } - // start job. returns before it's finished. + // starts job. returns once it is "dispatched" BackgroundJob& go(); // wait for completion. this spins with sleep() so not terribly efficient. // returns true if did not time out. // // note you can call wait() more than once if the first call times out. - bool wait(int msMax = 0); + bool wait(int msMax = 0, unsigned maxSleepInterval=1000); + + /* start several */ + static void go(list<BackgroundJob*>&); + + /* wait for several jobs to finish. */ + static void wait(list<BackgroundJob*>&, unsigned maxSleepInterval=1000); private: static BackgroundJob *grab; @@ -69,4 +91,23 @@ namespace mongo { volatile State state; }; + class PeriodicBackgroundJob : public BackgroundJob { + public: + PeriodicBackgroundJob( int millisToSleep ) + : _millis( millisToSleep ){ + } + + virtual ~PeriodicBackgroundJob(){} + + /** this gets called every millisToSleep ms */ + virtual void runLoop() = 0; + + virtual void run(); + + + private: + int _millis; + + }; + } // namespace mongo diff --git a/util/base64.cpp b/util/base64.cpp index 8d9d544..35a3aba 100644 --- a/util/base64.cpp +++ b/util/base64.cpp @@ -16,7 +16,7 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "base64.h" namespace mongo { diff --git a/util/builder.h b/util/builder.h deleted file mode 100644 index f9d3514..0000000 --- a/util/builder.h +++ /dev/null @@ -1,211 +0,0 @@ -/* builder.h */ - -/* Copyright 2009 10gen Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "../stdafx.h" -#include <string.h> - -namespace mongo { - - class StringBuilder; - - class BufBuilder { - public: - BufBuilder(int initsize = 512) : size(initsize) { - if ( size > 0 ) { - data = (char *) malloc(size); - assert(data); - } else { - data = 0; - } - l = 0; - } - ~BufBuilder() { - kill(); - } - - void kill() { - if ( data ) { - free(data); - data = 0; - } - } - - void reset( int maxSize = 0 ){ - l = 0; - if ( maxSize && size > maxSize ){ - free(data); - data = (char*)malloc(maxSize); - size = maxSize; - } - - } - - /* leave room for some stuff later */ - void skip(int n) { - grow(n); - } - - /* note this may be deallocated (realloced) if you keep writing. */ - char* buf() { - return data; - } - - /* assume ownership of the buffer - you must then free it */ - void decouple() { - data = 0; - } - - template<class T> void append(T j) { - *((T*)grow(sizeof(T))) = j; - } - void append(short j) { - append<short>(j); - } - void append(int j) { - append<int>(j); - } - void append(unsigned j) { - append<unsigned>(j); - } - void append(bool j) { - append<bool>(j); - } - void append(double j) { - append<double>(j); - } - - void append(const void *src, size_t len) { - memcpy(grow(len), src, len); - } - - void append(const char *str) { - append((void*) str, strlen(str)+1); - } - - void append(const string &str) { - append( (void *)str.c_str(), str.length() + 1 ); - } - - void append( int val , int padding ){ - - } - - int len() const { - return l; - } - - void setlen( int newLen ){ - l = newLen; - } - - private: - /* returns the pre-grow write position */ - char* grow(int by) { - int oldlen = l; - l += by; - if ( l > size ) { - int a = size * 2; - if ( a == 0 ) - a = 512; - if ( l > a ) - a = l + 16 * 1024; - assert( a < 64 * 1024 * 1024 ); - data = (char *) realloc(data, a); - size= a; - } - return data + oldlen; - } - - char *data; - int l; - int size; - - friend class StringBuilder; - }; - - class StringBuilder { - public: - StringBuilder( int initsize=256 ) - : _buf( initsize ){ - } - -#define SBNUM(val,maxSize,macro) \ - int prev = _buf.l; \ - int z = sprintf( _buf.grow(maxSize) , macro , (val) ); \ - _buf.l = prev + z; \ - return *this; - - - StringBuilder& operator<<( double x ){ - SBNUM( x , 25 , "%g" ); - } - StringBuilder& operator<<( int x ){ - SBNUM( x , 11 , "%d" ); - } - StringBuilder& operator<<( unsigned x ){ - SBNUM( x , 11 , "%u" ); - } - StringBuilder& operator<<( long x ){ - SBNUM( x , 22 , "%ld" ); - } - StringBuilder& operator<<( unsigned long x ){ - SBNUM( x , 22 , "%lu" ); - } - StringBuilder& operator<<( long long x ){ - SBNUM( x , 22 , "%lld" ); - } - StringBuilder& operator<<( unsigned long long x ){ - SBNUM( x , 22 , "%llu" ); - } - StringBuilder& operator<<( short x ){ - SBNUM( x , 8 , "%hd" ); - } - StringBuilder& operator<<( char c ){ - _buf.grow( 1 )[0] = c; - return *this; - } - - void append( const char * str ){ - int x = strlen( str ); - memcpy( _buf.grow( x ) , str , x ); - } - StringBuilder& operator<<( const char * str ){ - append( str ); - return *this; - } - StringBuilder& operator<<( const string& s ){ - append( s.c_str() ); - return *this; - } - - // access - - void reset( int maxSize = 0 ){ - _buf.reset( maxSize ); - } - - string str(){ - return string(_buf.data, _buf.l); - } - - private: - BufBuilder _buf; - }; - -} // namespace mongo diff --git a/util/concurrency/list.h b/util/concurrency/list.h new file mode 100644 index 0000000..968ff4d --- /dev/null +++ b/util/concurrency/list.h @@ -0,0 +1,81 @@ +// list.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +namespace mongo { + +/* this class uses a mutex for writes, but not for reads. + we can get fancier later... + + struct Member : public List1<Member>::Base { + const char *host; + int port; + }; + List1<Member> _members; + _members.head()->next(); + +*/ +template<typename T> +class List1 : boost::noncopyable { +public: + /* next() and head() return 0 at end of list */ + + List1() : _head(0), _m("List1"), _orphans(0) { } + + class Base { + friend class List1; + T *_next; + public: + T* next() const { return _next; } + }; + + T* head() const { return _head; } + + void push(T* t) { + scoped_lock lk(_m); + t->_next = _head; + _head = t; + } + + // intentionally leak. + void orphanAll() { + _head = 0; + } + + /* t is not deleted, but is removed from the list. (orphaned) */ + void orphan(T* t) { + scoped_lock lk(_m); + T *&prev = _head; + T *n = prev; + while( n != t ) { + prev = n->_next; + n = prev; + } + prev = t->_next; + if( ++_orphans > 500 ) + log() << "warning orphans=" << _orphans << '\n'; + } + +private: + T *_head; + mutex _m; + int _orphans; +}; + +}; diff --git a/util/concurrency/msg.h b/util/concurrency/msg.h new file mode 100644 index 0000000..a5b07d3 --- /dev/null +++ b/util/concurrency/msg.h @@ -0,0 +1,61 @@ +// @file msg.h - interthread message passing + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include <deque> +#include "task.h" + +namespace mongo { + + namespace task { + + typedef boost::function<void()> lam; + + /** typical usage is: task::fork( new Server("threadname") ); */ + class Server : public Task { + public: + /** send a message to the port */ + void send(lam); + + Server(string name) : _name(name), rq(false) { } + virtual ~Server() { } + + /** send message but block until function completes */ + void call(const lam&); + + void requeue() { rq = true; } + + protected: + /* REMINDER : for use in mongod, you will want to have this call Client::initThread(). */ + virtual void starting() { } + + private: + virtual bool initClient() { return true; } + virtual string name() { return _name; } + void doWork(); + deque<lam> d; + boost::mutex m; + boost::condition c; + string _name; + bool rq; + }; + + } + +} diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h new file mode 100644 index 0000000..9ab3960 --- /dev/null +++ b/util/concurrency/mutex.h @@ -0,0 +1,179 @@ +// @file mutex.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <map> +#include <set> + +namespace mongo { + + extern bool __destroyingStatics; + class mutex; + + // only used on _DEBUG builds: + class MutexDebugger { + typedef const char * mid; // mid = mutex ID + typedef map<mid,int> Preceeding; + map< mid, int > maxNest; + boost::thread_specific_ptr< Preceeding > us; + map< mid, set<mid> > followers; + boost::mutex &x; + unsigned magic; + public: + // set these to create an assert that + // b must never be locked before a + // so + // a.lock(); b.lock(); is fine + // b.lock(); alone is fine too + // only checked on _DEBUG builds. + string a,b; + + void aBreakPoint(){} + void programEnding(); + MutexDebugger(); + void entering(mid m) { + if( magic != 0x12345678 ) return; + + Preceeding *_preceeding = us.get(); + if( _preceeding == 0 ) + us.reset( _preceeding = new Preceeding() ); + Preceeding &preceeding = *_preceeding; + + if( a == m ) { + aBreakPoint(); + if( preceeding[b.c_str()] ) { + cout << "mutex problem " << b << " was locked before " << a << endl; + assert(false); + } + } + + preceeding[m]++; + if( preceeding[m] > 1 ) { + // recursive re-locking. + if( preceeding[m] > maxNest[m] ) + maxNest[m] = preceeding[m]; + return; + } + + bool failed = false; + string err; + { + boost::mutex::scoped_lock lk(x); + followers[m]; + for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { + if( m != i->first && i->second > 0 ) { + followers[i->first].insert(m); + if( followers[m].count(i->first) != 0 ){ + failed = true; + stringstream ss; + mid bad = i->first; + ss << "mutex problem" << + "\n when locking " << m << + "\n " << bad << " was already locked and should not be." + "\n set a and b above to debug.\n"; + stringstream q; + for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { + if( i->first != m && i->first != bad && i->second > 0 ) + q << " " << i->first << '\n'; + } + string also = q.str(); + if( !also.empty() ) + ss << "also locked before " << m << " in this thread (no particular order):\n" << also; + err = ss.str(); + break; + } + } + } + } + if( failed ) { + cout << err << endl; + assert( 0 ); + } + } + void leaving(mid m) { + if( magic != 0x12345678 ) return; + Preceeding& preceeding = *us.get(); + preceeding[m]--; + if( preceeding[m] < 0 ) { + cout << "ERROR: lock count for " << m << " is " << preceeding[m] << endl; + assert( preceeding[m] >= 0 ); + } + } + }; + extern MutexDebugger mutexDebugger; + + // If you create a local static instance of this class, that instance will be destroyed + // before all global static objects are destroyed, so __destroyingStatics will be set + // to true before the global static variables are destroyed. + class StaticObserver : boost::noncopyable { + public: + ~StaticObserver() { __destroyingStatics = true; } + }; + + // On pthread systems, it is an error to destroy a mutex while held. Static global + // mutexes may be held upon shutdown in our implementation, and this way we avoid + // destroying them. + class mutex : boost::noncopyable { + public: +#if defined(_DEBUG) + const char *_name; +#endif + +#if defined(_DEBUG) + mutex(const char *name) + : _name(name) +#else + mutex(const char *) +#endif + { + _m = new boost::mutex(); + } + ~mutex() { + if( !__destroyingStatics ) { + delete _m; + } + } + class scoped_lock : boost::noncopyable { +#if defined(_DEBUG) + mongo::mutex *mut; +#endif + public: + scoped_lock( mongo::mutex &m ) : _l( m.boost() ) { +#if defined(_DEBUG) + mut = &m; + mutexDebugger.entering(mut->_name); +#endif + } + ~scoped_lock() { +#if defined(_DEBUG) + mutexDebugger.leaving(mut->_name); +#endif + } + boost::mutex::scoped_lock &boost() { return _l; } + private: + boost::mutex::scoped_lock _l; + }; + private: + boost::mutex &boost() { return *_m; } + boost::mutex *_m; + }; + + typedef mutex::scoped_lock scoped_lock; + typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; + +} diff --git a/util/mvar.h b/util/concurrency/mvar.h index 7d17051..7d17051 100644 --- a/util/mvar.h +++ b/util/concurrency/mvar.h diff --git a/util/concurrency/readme.txt b/util/concurrency/readme.txt new file mode 100644 index 0000000..6f308f5 --- /dev/null +++ b/util/concurrency/readme.txt @@ -0,0 +1,15 @@ +util/concurrency/ files
+
+list.h - a list class that is lock-free for reads
+rwlock.h - read/write locks (RWLock)
+msg.h - message passing between threads
+task.h - an abstraction around threads
+mutex.h - small enhancements that wrap boost::mutex
+thread_pool.h
+mvar.h + This is based on haskell's MVar synchronization primitive: + http://www.haskell.org/ghc/docs/latest/html/libraries/base-4.2.0.0/Control-Concurrent-MVar.html + It is a thread-safe queue that can hold at most one object. + You can also think of it as a box that can be either full or empty. +value.h + Atomic wrapper for values/objects that are copy constructable / assignable diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h new file mode 100644 index 0000000..75169b2 --- /dev/null +++ b/util/concurrency/rwlock.h @@ -0,0 +1,220 @@ +// rwlock.h + +/* + * Copyright (C) 2010 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "mutex.h" + +#if BOOST_VERSION >= 103500 + #define BOOST_RWLOCK +#else + + #if defined(_WIN32) + #error need boost >= 1.35 for windows + #endif + + #include <pthread.h> + +#endif + +#ifdef BOOST_RWLOCK +#include <boost/thread/shared_mutex.hpp> +#undef assert +#define assert MONGO_assert +#endif + +namespace mongo { + +#ifdef BOOST_RWLOCK + class RWLock { + boost::shared_mutex _m; + public: +#if defined(_DEBUG) + const char *_name; + RWLock(const char *name) : _name(name) { } +#else + RWLock(const char *) { } +#endif + void lock(){ + _m.lock(); +#if defined(_DEBUG) + mutexDebugger.entering(_name); +#endif + } + void unlock(){ +#if defined(_DEBUG) + mutexDebugger.leaving(_name); +#endif + _m.unlock(); + } + + void lock_shared(){ + _m.lock_shared(); + } + + void unlock_shared(){ + _m.unlock_shared(); + } + + bool lock_shared_try( int millis ){ + boost::system_time until = get_system_time(); + until += boost::posix_time::milliseconds(millis); + if( _m.timed_lock_shared( until ) ) { + return true; + } + return false; + } + + bool lock_try( int millis = 0 ){ + boost::system_time until = get_system_time(); + until += boost::posix_time::milliseconds(millis); + if( _m.timed_lock( until ) ) { +#if defined(_DEBUG) + mutexDebugger.entering(_name); +#endif + return true; + } + return false; + } + + + }; +#else + class RWLock { + pthread_rwlock_t _lock; + + inline void check( int x ){ + if( x == 0 ) + return; + log() << "pthread rwlock failed: " << x << endl; + assert( x == 0 ); + } + + public: +#if defined(_DEBUG) + const char *_name; + RWLock(const char *name) : _name(name) { +#else + RWLock(const char *) { +#endif + check( pthread_rwlock_init( &_lock , 0 ) ); + } + + ~RWLock(){ + if ( ! __destroyingStatics ){ + check( pthread_rwlock_destroy( &_lock ) ); + } + } + + void lock(){ + check( pthread_rwlock_wrlock( &_lock ) ); +#if defined(_DEBUG) + mutexDebugger.entering(_name); +#endif + } + void unlock(){ +#if defined(_DEBUG) + mutexDebugger.leaving(_name); +#endif + check( pthread_rwlock_unlock( &_lock ) ); + } + + void lock_shared(){ + check( pthread_rwlock_rdlock( &_lock ) ); + } + + void unlock_shared(){ + check( pthread_rwlock_unlock( &_lock ) ); + } + + bool lock_shared_try( int millis ){ + return _try( millis , false ); + } + + bool lock_try( int millis = 0 ){ + if( _try( millis , true ) ) { +#if defined(_DEBUG) + mutexDebugger.entering(_name); +#endif + return true; + } + return false; + } + + bool _try( int millis , bool write ){ + while ( true ) { + int x = write ? + pthread_rwlock_trywrlock( &_lock ) : + pthread_rwlock_tryrdlock( &_lock ); + + if ( x <= 0 ) { + return true; + } + + if ( millis-- <= 0 ) + return false; + + if ( x == EBUSY ){ + sleepmillis(1); + continue; + } + check(x); + } + + return false; + } + + }; + + +#endif + + class rwlock_try_write { + RWLock& _l; + public: + struct exception { }; + rwlock_try_write(RWLock& l, int millis = 0) : _l(l) { + if( !l.lock_try(millis) ) throw exception(); + } + ~rwlock_try_write() { _l.unlock(); } + }; + + /* scoped lock */ + struct rwlock { + rwlock( const RWLock& lock , bool write , bool alreadyHaveLock = false ) + : _lock( (RWLock&)lock ) , _write( write ){ + + if ( ! alreadyHaveLock ){ + if ( _write ) + _lock.lock(); + else + _lock.lock_shared(); + } + } + + ~rwlock(){ + if ( _write ) + _lock.unlock(); + else + _lock.unlock_shared(); + } + + RWLock& _lock; + bool _write; + }; +} diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp new file mode 100644 index 0000000..b3e689a --- /dev/null +++ b/util/concurrency/spin_lock.cpp @@ -0,0 +1,66 @@ +// spin_lock.cpp + +/** +* Copyright (C) 2010 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <time.h> +#include "spin_lock.h" + +namespace mongo { + + SpinLock::SpinLock() : _locked( false ){} + + SpinLock::~SpinLock(){} + + void SpinLock::lock(){ +#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + + // fast path + if (!_locked && !__sync_lock_test_and_set(&_locked, true)) { + return; + } + + // wait for lock + int wait = 1000; + while ((wait-- > 0) && (_locked)) {} + + // if failed to grab lock, sleep + struct timespec t; + t.tv_sec = 0; + t.tv_nsec = 5000000; + while (__sync_lock_test_and_set(&_locked, true)) { + nanosleep(&t, NULL); + } +#else + + // WARNING "TODO Missing spin lock in this platform." + +#endif + } + + void SpinLock::unlock(){ +#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + + __sync_lock_release(&_locked); + +#else + + // WARNING "TODO Missing spin lock in this platform." + +#endif + } + +} // namespace mongo diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h new file mode 100644 index 0000000..110290d --- /dev/null +++ b/util/concurrency/spin_lock.h @@ -0,0 +1,48 @@ +// spin_lock.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef CONCURRENCY_SPINLOCK_HEADER +#define CONCURRENCY_SPINLOCK_HEADER + +namespace mongo { + + /** + * BIG WARNING - COMPILES, BUT NOT READY FOR USE - BIG WARNING + * + * The spinlock currently requires late GCC support + * routines. Support for other platforms will be added soon. + */ + class SpinLock{ + public: + SpinLock(); + ~SpinLock(); + + void lock(); + void unlock(); + + private: + bool _locked; + + // Non-copyable, non-assignable + SpinLock(SpinLock&); + SpinLock& operator=(SpinLock&); + }; + +} // namespace mongo + +#endif // CONCURRENCY_SPINLOCK_HEADER diff --git a/util/concurrency/task.cpp b/util/concurrency/task.cpp new file mode 100644 index 0000000..05e43e5 --- /dev/null +++ b/util/concurrency/task.cpp @@ -0,0 +1,171 @@ +// @file task.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "task.h" +#include "../goodies.h" +#include "../unittest.h" +#include "boost/thread/condition.hpp" + +namespace mongo { + + namespace task { + + /*void foo() { + boost::mutex m; + boost::mutex::scoped_lock lk(m); + boost::condition cond; + cond.wait(lk); + cond.notify_one(); + }*/ + + Task::Task() { + n = 0; + repeat = 0; + deleteSelf = true; + } + + void Task::halt() { repeat = 0; } + + void Task::run() { + assert( n == 0 ); + while( 1 ) { + n++; + try { + doWork(); + } + catch(...) { } + if( repeat == 0 ) + break; + sleepmillis(repeat); + if( inShutdown() ) + break; + } + } + + void Task::begin() { + go(); + } + + void fork(Task *t) { + t->begin(); + } + + void repeat(Task *t, unsigned millis) { + t->repeat = millis; + t->begin(); + } + + } +} + +#include "msg.h" + +/* task::Server */ + +namespace mongo { + namespace task { + + /* to get back a return value */ + struct Ret { + Ret() : done(false) { } + bool done; + boost::mutex m; + boost::condition c; + const lam *msg; + void f() { + (*msg)(); + done = true; + c.notify_one(); + } + }; + + void Server::call( const lam& msg ) { + Ret r; + r.msg = &msg; + lam f = boost::bind(&Ret::f, &r); + send(f); + { + boost::mutex::scoped_lock lk(r.m); + while( !r.done ) + r.c.wait(lk); + } + } + + void Server::send( lam msg ) { + { + boost::mutex::scoped_lock lk(m); + d.push_back(msg); + } + c.notify_one(); + } + + void Server::doWork() { + starting(); + while( 1 ) { + lam f; + try { + boost::mutex::scoped_lock lk(m); + while( d.empty() ) + c.wait(lk); + f = d.front(); + d.pop_front(); + } + catch(...) { + log() << "ERROR exception in Server:doWork?" << endl; + } + try { + f(); + if( rq ) { + rq = false; + { + boost::mutex::scoped_lock lk(m); + d.push_back(f); + } + } + } catch(std::exception& e) { + log() << "Server::doWork() exception " << e.what() << endl; + } catch(...) { + log() << "Server::doWork() unknown exception!" << endl; + } + } + } + + static Server *s; + static void abc(int i) { + cout << "Hello " << i << endl; + s->requeue(); + } + class TaskUnitTest : public mongo::UnitTest { + public: + virtual void run() { + lam f = boost::bind(abc, 3); + //f(); + + s = new Server("unittest"); + fork(s); + s->send(f); + + sleepsecs(30); + cout <<" done" << endl; + + } + }; // not running. taskunittest; + + } +} diff --git a/util/concurrency/task.h b/util/concurrency/task.h new file mode 100644 index 0000000..b3a2ece --- /dev/null +++ b/util/concurrency/task.h @@ -0,0 +1,72 @@ +// @file task.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../background.h" + +namespace mongo { + + namespace task { + + /** abstraction around threads. simpler than BackgroundJob which is used behind the scenes. + allocate the Task dynamically. when the thread terminates, the Task object will delete itself. + */ + class Task : private BackgroundJob { + protected: + virtual void doWork() = 0; // implement the task here. + virtual string name() = 0; // name the threada + public: + Task(); + + /** for a repeating task, stop after current invocation ends. can be called by other threads + as long as the Task is still in scope. + */ + void halt(); + private: + unsigned n, repeat; + friend void fork(Task* t); + friend void repeat(Task* t, unsigned millis); + virtual void run(); + virtual void ending() { } + void begin(); + }; + + /** run once */ + void fork(Task *t); + + /** run doWork() over and over, with a pause between runs of millis */ + void repeat(Task *t, unsigned millis); + + /*** Example *** + inline void sample() { + class Sample : public Task { + public: + int result; + virtual void doWork() { result = 1234; } + Sample() : result(0) { } + }; + shared_ptr<Sample> q( new Sample() ); + fork(q); + cout << q->result << endl; // could print 1234 or 0. + } + */ + + } + +} diff --git a/util/concurrency/thread_pool.cpp b/util/concurrency/thread_pool.cpp new file mode 100644 index 0000000..2caac1f --- /dev/null +++ b/util/concurrency/thread_pool.cpp @@ -0,0 +1,138 @@ +/* threadpool.cpp +*/ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "thread_pool.h" +#include "mvar.h" + +namespace mongo{ + namespace threadpool{ + + // Worker thread + class Worker : boost::noncopyable { + public: + explicit Worker(ThreadPool& owner) + : _owner(owner) + , _is_done(true) + , _thread(boost::bind(&Worker::loop, this)) + {} + + // destructor will block until current operation is completed + // Acts as a "join" on this thread + ~Worker(){ + _task.put(Task()); + _thread.join(); + } + + void set_task(Task& func){ + assert(!func.empty()); + assert(_is_done); + _is_done = false; + + _task.put(func); + } + + private: + ThreadPool& _owner; + MVar<Task> _task; + bool _is_done; // only used for error detection + boost::thread _thread; + + void loop(){ + while (true) { + Task task = _task.take(); + if (task.empty()) + break; // ends the thread + + try { + task(); + } catch (std::exception e){ + log() << "Unhandled exception in worker thread: " << e.what() << endl;; + } catch (...){ + log() << "Unhandled non-exception in worker thread" << endl; + } + _is_done = true; + _owner.task_done(this); + } + } + }; + + ThreadPool::ThreadPool(int nThreads) + : _mutex("ThreadPool"), _tasksRemaining(0) + , _nThreads(nThreads) + { + scoped_lock lock(_mutex); + while (nThreads-- > 0){ + Worker* worker = new Worker(*this); + _freeWorkers.push_front(worker); + } + } + + ThreadPool::~ThreadPool(){ + join(); + + assert(_tasks.empty()); + + // O(n) but n should be small + assert(_freeWorkers.size() == (unsigned)_nThreads); + + while(!_freeWorkers.empty()){ + delete _freeWorkers.front(); + _freeWorkers.pop_front(); + } + } + + void ThreadPool::join(){ + scoped_lock lock(_mutex); + while(_tasksRemaining){ + _condition.wait(lock.boost()); + } + } + + void ThreadPool::schedule(Task task){ + scoped_lock lock(_mutex); + + _tasksRemaining++; + + if (!_freeWorkers.empty()){ + _freeWorkers.front()->set_task(task); + _freeWorkers.pop_front(); + }else{ + _tasks.push_back(task); + } + } + + // should only be called by a worker from the worker thread + void ThreadPool::task_done(Worker* worker){ + scoped_lock lock(_mutex); + + if (!_tasks.empty()){ + worker->set_task(_tasks.front()); + _tasks.pop_front(); + }else{ + _freeWorkers.push_front(worker); + } + + _tasksRemaining--; + + if(_tasksRemaining == 0) + _condition.notify_all(); + } + + } //namespace threadpool +} //namespace mongo diff --git a/util/thread_pool.h b/util/concurrency/thread_pool.h index d891d7d..f0fe8f1 100644 --- a/util/thread_pool.h +++ b/util/concurrency/thread_pool.h @@ -18,7 +18,7 @@ #include <boost/function.hpp> #include <boost/bind.hpp> #undef assert -#define assert xassert +#define assert MONGO_assert namespace mongo { @@ -29,7 +29,7 @@ namespace threadpool { // exported to the mongo namespace class ThreadPool : boost::noncopyable{ - public: + public: explicit ThreadPool(int nThreads=8); // blocks until all tasks are complete (tasks_remaining() == 0) @@ -41,7 +41,6 @@ namespace threadpool { // Also, new tasks could be scheduled after this returns. void join(); - // task will be copied a few times so make sure it's relatively cheap void schedule(Task task); @@ -58,10 +57,9 @@ namespace threadpool { template<typename F, typename A, typename B, typename C, typename D, typename E> void schedule(F f, A a, B b, C c, D d, E e){ schedule(boost::bind(f,a,b,c,d,e)); } - int tasks_remaining() { return _tasksRemaining; } - private: + private: mongo::mutex _mutex; boost::condition _condition; diff --git a/util/concurrency/value.h b/util/concurrency/value.h new file mode 100644 index 0000000..dabeb95 --- /dev/null +++ b/util/concurrency/value.h @@ -0,0 +1,85 @@ +/* @file value.h + concurrency helpers Atomic<T> and DiagStr +*/ + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +namespace mongo { + + extern mutex _atomicMutex; + + /** atomic wrapper for a value. enters a mutex on each access. must + be copyable. + */ + template<typename T> + class Atomic : boost::noncopyable { + T val; + public: + Atomic<T>() { } + + void operator=(const T& a) { + scoped_lock lk(_atomicMutex); + val = a; } + + operator T() const { + scoped_lock lk(_atomicMutex); + return val; } + + /** example: + Atomic<int> q; + ... + { + Atomic<int>::tran t(q); + if( q.ref() > 0 ) + q.ref()--; + } + */ + class tran : private scoped_lock { + Atomic<T>& _a; + public: + tran(Atomic<T>& a) : scoped_lock(_atomicMutex), _a(a) { } + T& ref() { return _a.val; } + }; + }; + + /** this string COULD be mangled but with the double buffering, assuming writes + are infrequent, it's unlikely. thus, this is reasonable for lockless setting of + diagnostic strings, where their content isn't critical. + */ + class DiagStr { + char buf1[256]; + char buf2[256]; + char *p; + public: + DiagStr() { + memset(buf1, 0, 256); + memset(buf2, 0, 256); + p = buf1; + } + + const char * get() const { return p; } + + void set(const char *s) { + char *q = (p==buf1) ? buf2 : buf1; + strncpy(q, s, 255); + p = q; + } + }; + +} diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp new file mode 100644 index 0000000..2b46946 --- /dev/null +++ b/util/concurrency/vars.cpp @@ -0,0 +1,52 @@ +// vars.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "value.h" +#include "mutex.h" + +namespace mongo { + + mutex _atomicMutex("_atomicMutex"); + MutexDebugger mutexDebugger; + + MutexDebugger::MutexDebugger() : + x( *(new boost::mutex()) ), magic(0x12345678) { + // optional way to debug lock order + /* + a = "a_lock"; + b = "b_lock"; + */ + } + + void MutexDebugger::programEnding() { + if( logLevel>=1 && followers.size() ) { + std::cout << followers.size() << " mutexes in program" << endl; + for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) { + cout << i->first; + if( maxNest[i->first] > 1 ) + cout << " maxNest:" << maxNest[i->first]; + cout << '\n'; + for( set<mid>::iterator j = i->second.begin(); j != i->second.end(); j++ ) + cout << " " << *j << '\n'; + } + cout.flush(); + } + } + +} diff --git a/util/debug_util.cpp b/util/debug_util.cpp index 9c2f5dc..f0a916d 100644 --- a/util/debug_util.cpp +++ b/util/debug_util.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "../db/cmdline.h" #include "../db/jsobj.h" diff --git a/util/debug_util.h b/util/debug_util.h index 6d633c5..7686ecc 100644 --- a/util/debug_util.h +++ b/util/debug_util.h @@ -19,7 +19,7 @@ #ifndef _WIN32 #include <signal.h> -#endif // ndef _WIN32 +#endif namespace mongo { @@ -39,42 +39,47 @@ namespace mongo { char string[400]; } *OWS; -// for now, running on win32 means development not production -- -// use this to log things just there. -#if defined(_WIN32) -#define WIN if( 1 ) -#else -#define WIN if( 0 ) -#endif - #if defined(_DEBUG) -#define DEV if( 1 ) + enum {DEBUG_BUILD = 1}; #else -#define DEV if( 0 ) + enum {DEBUG_BUILD = 0}; #endif -#define DEBUGGING if( 0 ) +#define MONGO_DEV if( DEBUG_BUILD ) +#define DEV MONGO_DEV + +#define MONGO_DEBUGGING if( 0 ) +#define DEBUGGING MONGO_DEBUGGING // The following declare one unique counter per enclosing function. // NOTE The implementation double-increments on a match, but we don't really care. -#define SOMETIMES( occasion, howOften ) for( static unsigned occasion = 0; ++occasion % howOften == 0; ) -#define OCCASIONALLY SOMETIMES( occasionally, 16 ) -#define RARELY SOMETIMES( rarely, 128 ) -#define ONCE for( static bool undone = true; undone; undone = false ) +#define MONGO_SOMETIMES( occasion, howOften ) for( static unsigned occasion = 0; ++occasion % howOften == 0; ) +#define SOMETIMES MONGO_SOMETIMES + +#define MONGO_OCCASIONALLY SOMETIMES( occasionally, 16 ) +#define OCCASIONALLY MONGO_OCCASIONALLY + +#define MONGO_RARELY SOMETIMES( rarely, 128 ) +#define RARELY MONGO_RARELY + +#define MONGO_ONCE for( static bool undone = true; undone; undone = false ) +#define ONCE MONGO_ONCE #if defined(_WIN32) -#define strcasecmp _stricmp + inline int strcasecmp(const char* s1, const char* s2) {return _stricmp(s1, s2);} #endif // Sets SIGTRAP handler to launch GDB // Noop unless on *NIX and compiled with _DEBUG void setupSIGTRAPforGDB(); -#if defined(_WIN32) - inline void breakpoint() {} //noop -#else // defined(_WIN32) - // code to raise a breakpoint in GDB + extern int tlogLevel; + inline void breakpoint(){ + if ( tlogLevel < 0 ) + return; +#ifndef _WIN32 + // code to raise a breakpoint in GDB ONCE { //prevent SIGTRAP from crashing the program if default action is specified and we are not in gdb struct sigaction current; @@ -83,10 +88,11 @@ namespace mongo { signal(SIGTRAP, SIG_IGN); } } - + raise(SIGTRAP); +#endif } -#endif // defined(_WIN32) + // conditional breakpoint inline void breakif(bool test){ diff --git a/util/embedded_builder.h b/util/embedded_builder.h index d945bfb..8ca47e5 100644 --- a/util/embedded_builder.h +++ b/util/embedded_builder.h @@ -18,6 +18,7 @@ #pragma once namespace mongo { + // utility class for assembling hierarchical objects class EmbeddedBuilder { public: @@ -49,7 +50,7 @@ namespace mongo { return; } prepareContext( name ); - back()->appendAs( e, name.c_str() ); + back()->appendAs( e, name ); } BufBuilder &subarrayStartAs( string name ) { prepareContext( name ); diff --git a/util/file.h b/util/file.h index 347e2d6..0302290 100644 --- a/util/file.h +++ b/util/file.h @@ -27,6 +27,8 @@ #include <windows.h> #endif +#include "text.h" + namespace mongo { #ifndef __sunos__ @@ -47,7 +49,6 @@ public: #if defined(_WIN32) #include <io.h> -std::wstring toWideString(const char *s); class File : public FileInterface { HANDLE fd; @@ -68,9 +69,9 @@ public: fd = INVALID_HANDLE_VALUE; } void open(const char *filename, bool readOnly=false ) { - std::wstring filenamew = toWideString(filename); fd = CreateFile( - filenamew.c_str(), ( readOnly ? 0 : GENERIC_WRITE ) | GENERIC_READ, FILE_SHARE_READ, + toNativeString(filename).c_str(), + ( readOnly ? 0 : GENERIC_WRITE ) | GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); if( !is_open() ) { out() << "CreateFile failed " << filename << endl; @@ -118,7 +119,7 @@ class File : public FileInterface { void err(bool ok) { if( !ok && !_bad ) { _bad = true; - log() << "File I/O " << OUTPUT_ERRNO << '\n'; + log() << "File I/O " << errnoWithDescription() << '\n'; } } public: @@ -137,9 +138,11 @@ public: #endif void open(const char *filename, bool readOnly=false ) { - fd = ::open(filename, O_CREAT | ( readOnly ? 0 : O_RDWR ) | O_NOATIME, S_IRUSR | S_IWUSR); + fd = ::open(filename, + O_CREAT | ( readOnly ? 0 : ( O_RDWR | O_NOATIME ) ) , + S_IRUSR | S_IWUSR); if ( fd <= 0 ) { - out() << "couldn't open " << filename << ' ' << OUTPUT_ERRNO << endl; + out() << "couldn't open " << filename << ' ' << errnoWithDescription() << endl; return; } _bad = false; diff --git a/util/file_allocator.h b/util/file_allocator.h index 93b2b1c..b0267d9 100644 --- a/util/file_allocator.h +++ b/util/file_allocator.h @@ -1,4 +1,4 @@ -//file_allocator.h +// @file file_allocator.h /* Copyright 2009 10gen Inc. * @@ -15,10 +15,10 @@ * limitations under the License. */ -#include "../stdafx.h" +#include "../pch.h" #include <fcntl.h> #include <errno.h> -#if defined(__freebsd__) +#if defined(__freebsd__) || defined(__openbsd__) #include <sys/stat.h> #endif @@ -38,7 +38,7 @@ namespace mongo { */ public: #if !defined(_WIN32) - FileAllocator() : failed_() {} + FileAllocator() : pendingMutex_("FileAllocator"), failed_() {} #endif void start() { #if !defined(_WIN32) @@ -106,6 +106,53 @@ namespace mongo { #endif } + static void ensureLength( int fd , long size ){ + +#if defined(_WIN32) + // we don't zero on windows + // TODO : we should to avoid fragmentation +#else + +#if defined(__linux__) + int ret = posix_fallocate(fd,0,size); + if ( ret == 0 ) + return; + + log() << "posix_fallocate failed: " << errnoWithDescription( ret ) << " falling back" << endl; +#endif + + off_t filelen = lseek(fd, 0, SEEK_END); + if ( filelen < size ) { + if (filelen != 0) { + stringstream ss; + ss << "failure creating new datafile; lseek failed for fd " << fd << " with errno: " << errnoWithDescription(); + massert( 10440 , ss.str(), filelen == 0 ); + } + // Check for end of disk. + massert( 10441 , "Unable to allocate file of desired size", + size - 1 == lseek(fd, size - 1, SEEK_SET) ); + massert( 10442 , "Unable to allocate file of desired size", + 1 == write(fd, "", 1) ); + lseek(fd, 0, SEEK_SET); + + const long z = 256 * 1024; + const boost::scoped_array<char> buf_holder (new char[z]); + char* buf = buf_holder.get(); + memset(buf, 0, z); + long left = size; + while ( left > 0 ) { + long towrite = left; + if ( towrite > z ) + towrite = z; + + int written = write( fd , buf , towrite ); + massert( 10443 , errnoWithPrefix("write failed" ), written > 0 ); + left -= written; + } + } +#endif + } + private: #if !defined(_WIN32) void checkFailure() { @@ -161,42 +208,26 @@ namespace mongo { long fd = open(name.c_str(), O_CREAT | O_RDWR | O_NOATIME, S_IRUSR | S_IWUSR); if ( fd <= 0 ) { stringstream ss; - ss << "couldn't open " << name << ' ' << OUTPUT_ERRNO; + ss << "couldn't open " << name << ' ' << errnoWithDescription(); massert( 10439 , ss.str(), fd <= 0 ); } #if defined(POSIX_FADV_DONTNEED) if( posix_fadvise(fd, 0, size, POSIX_FADV_DONTNEED) ) { - log() << "warning: posix_fadvise fails " << name << ' ' << OUTPUT_ERRNO << endl; + log() << "warning: posix_fadvise fails " << name << ' ' << errnoWithDescription() << endl; } #endif - + + Timer t; + /* make sure the file is the full desired length */ - off_t filelen = lseek(fd, 0, SEEK_END); - if ( filelen < size ) { - massert( 10440 , "failure creating new datafile", filelen == 0 ); - // Check for end of disk. - massert( 10441 , "Unable to allocate file of desired size", - size - 1 == lseek(fd, size - 1, SEEK_SET) ); - massert( 10442 , "Unable to allocate file of desired size", - 1 == write(fd, "", 1) ); - lseek(fd, 0, SEEK_SET); - Timer t; - long z = 256 * 1024; - char buf[z]; - memset(buf, 0, z); - long left = size; - while ( left > 0 ) { - long towrite = left; - if ( towrite > z ) - towrite = z; - - int written = write( fd , buf , towrite ); - massert( 10443 , errnostring("write failed" ), written > 0 ); - left -= written; - } - log() << "done allocating datafile " << name << ", size: " << size/1024/1024 << "MB, took " << ((double)t.millis())/1000.0 << " secs" << endl; - } + ensureLength( fd , size ); + + log() << "done allocating datafile " << name << ", " + << "size: " << size/1024/1024 << "MB, " + << " took " << ((double)t.millis())/1000.0 << " secs" + << endl; + close( fd ); } catch ( ... ) { diff --git a/util/goodies.h b/util/goodies.h index cd5423b..1dfabdc 100644 --- a/util/goodies.h +++ b/util/goodies.h @@ -1,4 +1,4 @@ -// goodies.h +// @file goodies.h // miscellaneous junk /* Copyright 2009 10gen Inc. @@ -18,13 +18,22 @@ #pragma once -#if defined(_WIN32) -# include <windows.h> -#endif +#include "../bson/util/misc.h" +#include "concurrency/mutex.h" namespace mongo { -#if !defined(_WIN32) && !defined(NOEXECINFO) && !defined(__freebsd__) && !defined(__sun__) + void setThreadName(const char * name); + string getThreadName(); + + template<class T> + inline string ToString(const T& t) { + stringstream s; + s << t; + return s.str(); + } + +#if !defined(_WIN32) && !defined(NOEXECINFO) && !defined(__freebsd__) && !defined(__openbsd__) && !defined(__sun__) } // namespace mongo @@ -105,15 +114,14 @@ namespace mongo { } // PRINT(2+2); prints "2+2: 4" -#define PRINT(x) cout << #x ": " << (x) << endl +#define MONGO_PRINT(x) cout << #x ": " << (x) << endl +#define PRINT MONGO_PRINT // PRINTFL; prints file:line -#define PRINTFL cout << __FILE__ ":" << __LINE__ << endl - -#undef yassert +#define MONGO_PRINTFL cout << __FILE__ ":" << __LINE__ << endl +#define PRINTFL MONGO_PRINTFL #undef assert -#define assert xassert -#define yassert 1 +#define assert MONGO_assert struct WrappingInt { WrappingInt() { @@ -139,22 +147,6 @@ namespace mongo { } }; -} // namespace mongo - -#include <ctime> - -namespace mongo { - - inline void time_t_to_String(time_t t, char *buf) { -#if defined(_WIN32) - ctime_s(buf, 64, &t); -#else - ctime_r(&t, buf); -#endif - buf[24] = 0; // don't want the \n - } - - inline void time_t_to_Struct(time_t t, struct tm * buf , bool local = false ) { #if defined(_WIN32) if ( local ) @@ -169,12 +161,26 @@ namespace mongo { #endif } + // uses ISO 8601 dates without trailing Z + // colonsOk should be false when creating filenames + inline string terseCurrentTime(bool colonsOk=true){ + struct tm t; + time_t_to_Struct( time(0) , &t ); + const char* fmt = (colonsOk ? "%Y-%m-%dT%H:%M:%S" : "%Y-%m-%dT%H-%M-%S"); + char buf[32]; + assert(strftime(buf, sizeof(buf), fmt, &t) == 19); + return buf; + } -#define asctime _asctime_not_threadsafe_ -#define gmtime _gmtime_not_threadsafe_ -#define localtime _localtime_not_threadsafe_ -#define ctime _ctime_is_not_threadsafe_ +#define MONGO_asctime _asctime_not_threadsafe_ +#define asctime MONGO_asctime +#define MONGO_gmtime _gmtime_not_threadsafe_ +#define gmtime MONGO_gmtime +#define MONGO_localtime _localtime_not_threadsafe_ +#define localtime MONGO_localtime +#define MONGO_ctime _ctime_is_not_threadsafe_ +#define ctime MONGO_ctime #if defined(_WIN32) || defined(__sunos__) inline void sleepsecs(int s) { @@ -183,24 +189,24 @@ namespace mongo { xt.sec += s; boost::thread::sleep(xt); } - inline void sleepmillis(int s) { + inline void sleepmillis(long long s) { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); - xt.sec += ( s / 1000 ); - xt.nsec += ( s % 1000 ) * 1000000; + xt.sec += (int)( s / 1000 ); + xt.nsec += (int)(( s % 1000 ) * 1000000); if ( xt.nsec >= 1000000000 ) { xt.nsec -= 1000000000; xt.sec++; } boost::thread::sleep(xt); } - inline void sleepmicros(int s) { + inline void sleepmicros(long long s) { if ( s <= 0 ) return; boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); - xt.sec += ( s / 1000000 ); - xt.nsec += ( s % 1000000 ) * 1000; + xt.sec += (int)( s / 1000000 ); + xt.nsec += (int)(( s % 1000000 ) * 1000); if ( xt.nsec >= 1000000000 ) { xt.nsec -= 1000000000; xt.sec++; @@ -216,22 +222,23 @@ namespace mongo { cout << "nanosleep failed" << endl; } } - inline void sleepmicros(int s) { + inline void sleepmicros(long long s) { if ( s <= 0 ) return; struct timespec t; t.tv_sec = (int)(s / 1000000); - t.tv_nsec = s % 1000000; - if ( nanosleep( &t , 0 ) ){ + t.tv_nsec = 1000 * ( s % 1000000 ); + struct timespec out; + if ( nanosleep( &t , &out ) ){ cout << "nanosleep failed" << endl; } } - inline void sleepmillis(int s) { + inline void sleepmillis(long long s) { sleepmicros( s * 1000 ); } #endif -// note this wraps + // note this wraps inline int tdiff(unsigned told, unsigned tnew) { return WrappingInt::diff(tnew, told); } @@ -242,15 +249,6 @@ namespace mongo { return (xt.sec & 0xfffff) * 1000 + t; } - struct Date_t { - // TODO: make signed (and look for related TODO's) - unsigned long long millis; - Date_t(): millis(0) {} - Date_t(unsigned long long m): millis(m) {} - operator unsigned long long&() { return millis; } - operator const unsigned long long&() const { return millis; } - }; - inline Date_t jsTime() { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); @@ -273,44 +271,7 @@ namespace mongo { unsigned secs = xt.sec % 1024; return secs*1000000 + t; } - using namespace boost; - - extern bool __destroyingStatics; - - // If you create a local static instance of this class, that instance will be destroyed - // before all global static objects are destroyed, so __destroyingStatics will be set - // to true before the global static variables are destroyed. - class StaticObserver : boost::noncopyable { - public: - ~StaticObserver() { __destroyingStatics = true; } - }; - - // On pthread systems, it is an error to destroy a mutex while held. Static global - // mutexes may be held upon shutdown in our implementation, and this way we avoid - // destroying them. - class mutex : boost::noncopyable { - public: - mutex() { new (_buf) boost::mutex(); } - ~mutex() { - if( !__destroyingStatics ) { - boost().boost::mutex::~mutex(); - } - } - class scoped_lock : boost::noncopyable { - public: - scoped_lock( mongo::mutex &m ) : _l( m.boost() ) {} - boost::mutex::scoped_lock &boost() { return _l; } - private: - boost::mutex::scoped_lock _l; - }; - private: - boost::mutex &boost() { return *( boost::mutex * )( _buf ); } - char _buf[ sizeof( boost::mutex ) ]; - }; - typedef mongo::mutex::scoped_lock scoped_lock; - typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; - // simple scoped timer class Timer { public: @@ -320,17 +281,17 @@ namespace mongo { Timer( unsigned long long start ) { old = start; } - int seconds(){ + int seconds() const { return (int)(micros() / 1000000); } - int millis() { + int millis() const { return (long)(micros() / 1000); } - unsigned long long micros() { + unsigned long long micros() const { unsigned long long n = curTimeMicros64(); return n - old; } - unsigned long long micros(unsigned long long & n) { // returns cur time in addition to timer result + unsigned long long micros(unsigned long long & n) const { // returns cur time in addition to timer result n = curTimeMicros64(); return n - old; } @@ -364,6 +325,7 @@ namespace mongo { if ( strlen(str) < l ) return false; return strncmp(str, prefix, l) == 0; } + inline bool startsWith(string s, string p) { return startsWith(s.c_str(), p.c_str()); } inline bool endsWith(const char *p, const char *suffix) { size_t a = strlen(p); @@ -372,12 +334,6 @@ namespace mongo { return strcmp(p + a - b, suffix) == 0; } -} // namespace mongo - -#include "boost/detail/endian.hpp" - -namespace mongo { - inline unsigned long swapEndian(unsigned long x) { return ((x & 0xff) << 24) | @@ -395,15 +351,6 @@ namespace mongo { return swapEndian(x); } #endif - - // Like strlen, but only scans up to n bytes. - // Returns -1 if no '0' found. - inline int strnlen( const char *s, int n ) { - for( int i = 0; i < n; ++i ) - if ( !s[ i ] ) - return i; - return -1; - } #if !defined(_WIN32) typedef int HANDLE; @@ -446,7 +393,7 @@ namespace mongo { boost::thread_specific_ptr<T> _val; }; - class ProgressMeter { + class ProgressMeter : boost::noncopyable { public: ProgressMeter( long long total , int secondsBetween = 3 , int checkInterval = 100 ){ reset( total , secondsBetween , checkInterval ); @@ -513,6 +460,10 @@ namespace mongo { buf << _done << "/" << _total << " " << (_done*100)/_total << "%"; return buf.str(); } + + bool operator==( const ProgressMeter& other ) const { + return this == &other; + } private: bool _active; @@ -526,9 +477,39 @@ namespace mongo { int _lastTime; }; + class ProgressMeterHolder : boost::noncopyable { + public: + ProgressMeterHolder( ProgressMeter& pm ) + : _pm( pm ){ + } + + ~ProgressMeterHolder(){ + _pm.finished(); + } + + ProgressMeter* operator->(){ + return &_pm; + } + + bool hit( int n = 1 ){ + return _pm.hit( n ); + } + + void finished(){ + _pm.finished(); + } + + bool operator==( const ProgressMeter& other ){ + return _pm == other; + } + + private: + ProgressMeter& _pm; + }; + class TicketHolder { public: - TicketHolder( int num ){ + TicketHolder( int num ) : _mutex("TicketHolder") { _outof = num; _num = num; } @@ -611,7 +592,7 @@ namespace mongo { _buf = 0; } - operator string() const { + string toString() const { string s = _buf; return s; } @@ -634,11 +615,11 @@ namespace mongo { } bool operator!=( const char * str ) const { - return strcmp( _buf , str ); + return strcmp( _buf , str ) != 0; } bool empty() const { - return _buf[0] == 0; + return _buf == 0 || _buf[0] == 0; } private: @@ -651,6 +632,20 @@ namespace mongo { inline bool isNumber( char c ) { return c >= '0' && c <= '9'; } + + inline unsigned stringToNum(const char *str) { + unsigned x = 0; + const char *p = str; + while( 1 ) { + if( !isNumber(*p) ) { + if( *p == 0 && p != str ) + break; + throw 0; + } + x = x * 10 + *p++ - '0'; + } + return x; + } // for convenience, '{' is greater than anything and stops number parsing inline int lexNumCmp( const char *s1, const char *s2 ) { @@ -703,5 +698,46 @@ namespace mongo { return -1; return 0; } - + + /** A generic pointer type for function arguments. + * It will convert from any pointer type except auto_ptr. + * Semantics are the same as passing the pointer returned from get() + * const ptr<T> => T * const + * ptr<const T> => T const * or const T* + */ + template <typename T> + struct ptr{ + + ptr() : _p(NULL) {} + + // convert to ptr<T> + ptr(T* p) : _p(p) {} // needed for NULL + template<typename U> ptr(U* p) : _p(p) {} + template<typename U> ptr(const ptr<U>& p) : _p(p) {} + template<typename U> ptr(const boost::shared_ptr<U>& p) : _p(p.get()) {} + template<typename U> ptr(const boost::scoped_ptr<U>& p) : _p(p.get()) {} + //template<typename U> ptr(const auto_ptr<U>& p) : _p(p.get()) {} + + // assign to ptr<T> + ptr& operator= (T* p) { _p = p; return *this; } // needed for NULL + template<typename U> ptr& operator= (U* p) { _p = p; return *this; } + template<typename U> ptr& operator= (const ptr<U>& p) { _p = p; return *this; } + template<typename U> ptr& operator= (const boost::shared_ptr<U>& p) { _p = p.get(); return *this; } + template<typename U> ptr& operator= (const boost::scoped_ptr<U>& p) { _p = p.get(); return *this; } + //template<typename U> ptr& operator= (const auto_ptr<U>& p) { _p = p.get(); return *this; } + + // use + T* operator->() const { return _p; } + T& operator*() const { return *_p; } + + // convert from ptr<T> + operator T* () const { return _p; } + + private: + T* _p; + }; + + /** Hmmmm */ + using namespace boost; + } // namespace mongo diff --git a/util/hashtab.h b/util/hashtab.h index d46591c..16c5483 100644 --- a/util/hashtab.h +++ b/util/hashtab.h @@ -22,7 +22,7 @@ #pragma once -#include "../stdafx.h" +#include "../pch.h" #include <map> namespace mongo { @@ -36,7 +36,8 @@ namespace mongo { template < class Key, - class Type + class Type, + class PTR > class HashTable : boost::noncopyable { public: @@ -51,10 +52,15 @@ namespace mongo { void setUnused() { hash = 0; } - } *nodes; + }; + PTR _buf; int n; int maxChain; + Node& nodes(int i) { + return *((Node*) _buf.at(i * sizeof(Node), sizeof(Node))); + } + int _find(const Key& k, bool& found) { found = false; int h = k.hash(); @@ -63,12 +69,12 @@ namespace mongo { int chain = 0; int firstNonUsed = -1; while ( 1 ) { - if ( !nodes[i].inUse() ) { + if ( !nodes(i).inUse() ) { if ( firstNonUsed < 0 ) firstNonUsed = i; } - if ( nodes[i].hash == h && nodes[i].k == k ) { + if ( nodes(i).hash == h && nodes(i).k == k ) { if ( chain >= 200 ) out() << "warning: hashtable " << name << " long chain " << endl; found = true; @@ -92,24 +98,28 @@ namespace mongo { public: /* buf must be all zeroes on initialization. */ - HashTable(void *buf, int buflen, const char *_name) : name(_name) { + HashTable(PTR buf, int buflen, const char *_name) : name(_name) { int m = sizeof(Node); // out() << "hashtab init, buflen:" << buflen << " m:" << m << endl; n = buflen / m; if ( (n & 1) == 0 ) n--; maxChain = (int) (n * 0.05); - nodes = (Node *) buf; + _buf = buf; + //nodes = (Node *) buf; + + if ( sizeof(Node) != 628 ){ + out() << "HashTable() " << _name << " sizeof(node):" << sizeof(Node) << " n:" << n << " sizeof(Key): " << sizeof(Key) << " sizeof(Type):" << sizeof(Type) << endl; + assert( sizeof(Node) == 628 ); + } - assert( sizeof(Node) == 628 ); - //out() << "HashTable() " << _name << " sizeof(node):" << sizeof(Node) << " n:" << n << endl; } Type* get(const Key& k) { bool found; int i = _find(k, found); if ( found ) - return &nodes[i].value; + return &nodes(i).value; return 0; } @@ -117,8 +127,9 @@ namespace mongo { bool found; int i = _find(k, found); if ( i >= 0 && found ) { - nodes[i].k.kill(); - nodes[i].setUnused(); + Node& n = nodes(i); + n.k.kill(); + n.setUnused(); } } /* @@ -136,24 +147,34 @@ namespace mongo { int i = _find(k, found); if ( i < 0 ) return false; + Node& n = nodes(i); if ( !found ) { - nodes[i].k = k; - nodes[i].hash = k.hash(); + n.k = k; + n.hash = k.hash(); } else { - assert( nodes[i].hash == k.hash() ); + assert( n.hash == k.hash() ); } - nodes[i].value = value; + n.value = value; return true; } typedef void (*IteratorCallback)( const Key& k , Type& v ); - void iterAll( IteratorCallback callback ){ for ( int i=0; i<n; i++ ){ - if ( ! nodes[i].inUse() ) + if ( ! nodes(i).inUse() ) + continue; + callback( nodes(i).k , nodes(i).value ); + } + } + + // TODO: should probably use boost::bind for this, but didn't want to look at it + typedef void (*IteratorCallback2)( const Key& k , Type& v , void * extra ); + void iterAll( IteratorCallback2 callback , void * extra ){ + for ( int i=0; i<n; i++ ){ + if ( ! nodes(i).inUse() ) continue; - callback( nodes[i].k , nodes[i].value ); + callback( nodes(i).k , nodes(i).value , extra ); } } @@ -32,4 +32,36 @@ namespace mongo { inline char fromHex( const char *c ) { return ( fromHex( c[ 0 ] ) << 4 ) | fromHex( c[ 1 ] ); } + + inline string toHex(const void* inRaw, int len){ + static const char hexchars[] = "0123456789ABCDEF"; + + StringBuilder out; + const char* in = reinterpret_cast<const char*>(inRaw); + for (int i=0; i<len; ++i){ + char c = in[i]; + char hi = hexchars[(c & 0xF0) >> 4]; + char lo = hexchars[(c & 0x0F)]; + + out << hi << lo; + } + + return out.str(); + } + + inline string toHexLower(const void* inRaw, int len){ + static const char hexchars[] = "0123456789abcdef"; + + StringBuilder out; + const char* in = reinterpret_cast<const char*>(inRaw); + for (int i=0; i<len; ++i){ + char c = in[i]; + char hi = hexchars[(c & 0xF0) >> 4]; + char lo = hexchars[(c & 0x0F)]; + + out << hi << lo; + } + + return out.str(); + } } diff --git a/util/histogram.cpp b/util/histogram.cpp new file mode 100644 index 0000000..4541dfd --- /dev/null +++ b/util/histogram.cpp @@ -0,0 +1,129 @@ +// histogram.cc + +/** +* Copyright (C) 2010 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <iomanip> +#include <limits> +#include <sstream> + +#include "histogram.h" + +namespace mongo { + + using std::ostringstream; + using std::setfill; + using std::setw; + + Histogram::Histogram( const Options& opts ) + : _initialValue( opts.initialValue ) + , _numBuckets( opts.numBuckets ) + , _boundaries( new uint32_t[_numBuckets] ) + , _buckets( new uint64_t[_numBuckets] ){ + + // TODO more sanity checks + // + not too few buckets + // + initialBucket and bucketSize fit within 32 bit ints + + // _boundaries store the maximum value falling in that bucket. + if ( opts.exponential ){ + uint32_t twoPow = 1; // 2^0 + for ( uint32_t i = 0; i < _numBuckets - 1; i++){ + _boundaries[i] = _initialValue + opts.bucketSize * twoPow; + twoPow *= 2; // 2^i+1 + } + } else { + _boundaries[0] = _initialValue + opts.bucketSize; + for ( uint32_t i = 1; i < _numBuckets - 1; i++ ){ + _boundaries[i] = _boundaries[ i-1 ] + opts.bucketSize; + } + } + _boundaries[ _numBuckets-1 ] = std::numeric_limits<uint32_t>::max(); + + for ( uint32_t i = 0; i < _numBuckets; i++ ) { + _buckets[i] = 0; + } + } + + Histogram::~Histogram() { + delete [] _boundaries; + delete [] _buckets; + } + + void Histogram::insert( uint32_t element ){ + if ( element < _initialValue) return; + + _buckets[ _findBucket(element) ] += 1; + } + + string Histogram::toHTML() const{ + uint64_t max = 0; + for ( uint32_t i = 0; i < _numBuckets; i++ ){ + if ( _buckets[i] > max ){ + max = _buckets[i]; + } + } + if ( max == 0 ) { + return "histogram is empty\n"; + } + + // normalize buckets to max + const int maxBar = 20; + ostringstream ss; + for ( uint32_t i = 0; i < _numBuckets; i++ ){ + int barSize = _buckets[i] * maxBar / max; + ss << string( barSize,'*' ) + << setfill(' ') << setw( maxBar-barSize + 12 ) + << _boundaries[i] << '\n'; + } + + return ss.str(); + } + + uint64_t Histogram::getCount( uint32_t bucket ) const { + if ( bucket >= _numBuckets ) return 0; + + return _buckets[ bucket ]; + } + + uint32_t Histogram::getBoundary( uint32_t bucket ) const { + if ( bucket >= _numBuckets ) return 0; + + return _boundaries[ bucket ]; + } + + uint32_t Histogram::getBucketsNum() const { + return _numBuckets; + } + + uint32_t Histogram::_findBucket( uint32_t element ) const{ + // TODO assert not too small a value? + + uint32_t low = 0; + uint32_t high = _numBuckets - 1; + while ( low < high ){ + // low + ( (high - low) / 2 ); + uint32_t mid = ( low + high ) >> 1; + if ( element > _boundaries[ mid ] ){ + low = mid + 1; + } else { + high = mid; + } + } + return low; + } + +} // namespace mongo diff --git a/util/histogram.h b/util/histogram.h new file mode 100644 index 0000000..d4a6fa7 --- /dev/null +++ b/util/histogram.h @@ -0,0 +1,128 @@ +// histogram.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef UTIL_HISTOGRAM_HEADER +#define UTIL_HISTOGRAM_HEADER + +#include "../pch.h" + +#include <string> + +namespace mongo { + + using std::string; + + /** + * A histogram for a 32-bit integer range. + */ + class Histogram { + public: + /** + * Construct a histogram with 'numBuckets' buckets, optionally + * having the first bucket start at 'initialValue' rather than + * 0. By default, the histogram buckets will be 'bucketSize' wide. + * + * Usage example: + * Histogram::Options opts; + * opts.numBuckets = 3; + * opts.bucketSize = 10; + * Histogram h( opts ); + * + * Generates the bucket ranges [0..10],[11..20],[21..max_int] + * + * Alternatively, the flag 'exponential' could be turned on, in + * which case a bucket's maximum value will be + * initialValue + bucketSize * 2 ^ [0..numBuckets-1] + * + * Usage example: + * Histogram::Options opts; + * opts.numBuckets = 4; + * opts.bucketSize = 125; + * opts.exponential = true; + * Histogram h( opts ); + * + * Generates the bucket ranges [0..125],[126..250],[251..500],[501..max_int] + */ + struct Options { + boost::uint32_t numBuckets; + boost::uint32_t bucketSize; + boost::uint32_t initialValue; + + // use exponential buckets? + bool exponential; + + Options() + : numBuckets(0) + , bucketSize(0) + , initialValue(0) + , exponential(false){} + }; + explicit Histogram( const Options& opts ); + ~Histogram(); + + /** + * Find the bucket that 'element' falls into and increment its count. + */ + void insert( boost::uint32_t element ); + + /** + * Render the histogram as string that can be used inside an + * HTML doc. + */ + string toHTML() const; + + // testing interface below -- consider it private + + /** + * Return the count for the 'bucket'-th bucket. + */ + boost::uint64_t getCount( boost::uint32_t bucket ) const; + + /** + * Return the maximum element that would fall in the + * 'bucket'-th bucket. + */ + boost::uint32_t getBoundary( boost::uint32_t bucket ) const; + + /** + * Return the number of buckets in this histogram. + */ + boost::uint32_t getBucketsNum() const; + + private: + /** + * Returns the bucket where 'element' should fall + * into. Currently assumes that 'element' is greater than the + * minimum 'inialValue'. + */ + boost::uint32_t _findBucket( boost::uint32_t element ) const; + + boost::uint32_t _initialValue; // no value lower than it is recorded + boost::uint32_t _numBuckets; // total buckets in the histogram + + // all below owned here + boost::uint32_t* _boundaries; // maximum element of each bucket + boost::uint64_t* _buckets; // current count of each bucket + + Histogram( const Histogram& ); + Histogram& operator=( const Histogram& ); + }; + +} // namespace mongo + +#endif // UTIL_HISTOGRAM_HEADER diff --git a/util/hostandport.h b/util/hostandport.h new file mode 100644 index 0000000..6124570 --- /dev/null +++ b/util/hostandport.h @@ -0,0 +1,142 @@ +// hostandport.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "sock.h" +#include "../db/cmdline.h" +#include "mongoutils/str.h" + +namespace mongo { + + using namespace mongoutils; + + /** helper for manipulating host:port connection endpoints. + */ + struct HostAndPort { + HostAndPort() : _port(-1) { } + + /** From a string hostname[:portnumber] + Throws user assertion if bad config string or bad port #. + */ + HostAndPort(string s); + + /** @param p port number. -1 is ok to use default. */ + HostAndPort(string h, int p /*= -1*/) : _host(h), _port(p) { } + + HostAndPort(const SockAddr& sock ) + : _host( sock.getAddr() ) , _port( sock.getPort() ){ + } + + static HostAndPort me() { + return HostAndPort("localhost", cmdLine.port); + } + + /* uses real hostname instead of localhost */ + static HostAndPort Me(); + + bool operator<(const HostAndPort& r) const { + if( _host < r._host ) + return true; + if( _host == r._host ) + return port() < r.port(); + return false; + } + + bool operator==(const HostAndPort& r) const { + return _host == r._host && port() == r.port(); + } + + /* returns true if the host/port combo identifies this process instance. */ + bool isSelf() const; // defined in message.cpp + + bool isLocalHost() const; + + // @returns host:port + string toString() const; + + operator string() const { return toString(); } + + string host() const { return _host; } + + int port() const { return _port >= 0 ? _port : CmdLine::DefaultDBPort; } + bool hasPort() const { return _port >= 0; } + void setPort( int port ) { _port = port; } + + private: + // invariant (except full obj assignment): + string _host; + int _port; // -1 indicates unspecified + }; + + /** returns true if strings seem to be the same hostname. + "nyc1" and "nyc1.acme.com" are treated as the same. + in fact "nyc1.foo.com" and "nyc1.acme.com" are treated the same - + we oly look up to the first period. + */ + inline bool sameHostname(const string& a, const string& b) { + return str::before(a, '.') == str::before(b, '.'); + } + + inline HostAndPort HostAndPort::Me() { + string h = getHostName(); + assert( !h.empty() ); + assert( h != "localhost" ); + return HostAndPort(h, cmdLine.port); + } + + inline string HostAndPort::toString() const { + stringstream ss; + ss << _host; + if ( _port != -1 ){ + ss << ':'; +#if defined(_DEBUG) + if( _port >= 44000 && _port < 44100 ) { + log() << "warning: special debug port 44xxx used" << endl; + ss << _port+1; + } + else + ss << _port; +#else + ss << _port; +#endif + } + return ss.str(); + } + + inline bool HostAndPort::isLocalHost() const { + return _host == "localhost" || startsWith(_host.c_str(), "127.") || _host == "::1"; + } + + inline HostAndPort::HostAndPort(string s) { + const char *p = s.c_str(); + uassert(13110, "HostAndPort: bad config string", *p); + const char *colon = strrchr(p, ':'); + if( colon ) { + int port = atoi(colon+1); + uassert(13095, "HostAndPort: bad port #", port > 0); + _host = string(p,colon-p); + _port = port; + } + else { + // no port specified. + _host = p; + _port = -1; + } + } + +} diff --git a/util/httpclient.cpp b/util/httpclient.cpp index 08b6220..4f78029 100644 --- a/util/httpclient.cpp +++ b/util/httpclient.cpp @@ -15,11 +15,11 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "httpclient.h" #include "sock.h" #include "message.h" -#include "builder.h" +#include "../bson/util/builder.h" namespace mongo { @@ -94,15 +94,11 @@ namespace mongo { { const char * out = req.c_str(); int toSend = req.size(); - while ( toSend ){ - int did = p.send( out , toSend ); - toSend -= did; - out += did; - } + p.send( out , toSend, "_go" ); } char buf[4096]; - int got = p.recv( buf , 4096 ); + int got = p.unsafe_recv( buf , 4096 ); buf[got] = 0; int rc; @@ -114,19 +110,41 @@ namespace mongo { if ( result ) sb << buf; - while ( ( got = p.recv( buf , 4096 ) ) > 0){ + while ( ( got = p.unsafe_recv( buf , 4096 ) ) > 0){ if ( result ) sb << buf; } if ( result ){ - result->_code = rc; - result->_entireResponse = sb.str(); + result->_init( rc , sb.str() ); } return rc; } + void HttpClient::Result::_init( int code , string entire ){ + _code = code; + _entireResponse = entire; + + while ( true ){ + size_t i = entire.find( '\n' ); + if ( i == string::npos ){ + // invalid + break; + } + + string h = entire.substr( 0 , i ); + entire = entire.substr( i + 1 ); + + if ( h.size() && h[h.size()-1] == '\r' ) + h = h.substr( 0 , h.size() - 1 ); + + if ( h.size() == 0 ) + break; + } + + _body = entire; + } } diff --git a/util/httpclient.h b/util/httpclient.h index ef3e147..8b9da97 100644 --- a/util/httpclient.h +++ b/util/httpclient.h @@ -17,7 +17,7 @@ #pragma once -#include "../stdafx.h" +#include "../pch.h" namespace mongo { @@ -31,9 +31,25 @@ namespace mongo { const string& getEntireResponse() const { return _entireResponse; } + + const map<string,string> getHeaders() const { + return _headers; + } + + const string& getBody() const { + return _body; + } + private: + + void _init( int code , string entire ); + int _code; string _entireResponse; + + map<string,string> _headers; + string _body; + friend class HttpClient; }; diff --git a/util/log.cpp b/util/log.cpp new file mode 100644 index 0000000..bfd9154 --- /dev/null +++ b/util/log.cpp @@ -0,0 +1,127 @@ +/** @file log.cpp + */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "assert_util.h" +#include "assert.h" +#include "file.h" +#include <cmath> +using namespace std; + +#ifndef _WIN32 +#include <cxxabi.h> +#include <sys/file.h> +#endif + +#include "../db/jsobj.h" + +namespace mongo { + + Nullstream nullstream; + vector<Tee*>* Logstream::globalTees = 0; + + thread_specific_ptr<Logstream> Logstream::tsp; + + class LoggingManager { + public: + LoggingManager() + : _enabled(0) , _file(0) { + } + + void start( const string& lp , bool append ){ + uassert( 10268 , "LoggingManager already started" , ! _enabled ); + _append = append; + + // test path + FILE * test = fopen( lp.c_str() , _append ? "a" : "w" ); + if ( ! test ){ + cout << "can't open [" << lp << "] for log file: " << errnoWithDescription() << endl; + dbexit( EXIT_BADOPTIONS ); + assert( 0 ); + } + fclose( test ); + + _path = lp; + _enabled = 1; + rotate(); + } + + void rotate(){ + if ( ! _enabled ){ + cout << "LoggingManager not enabled" << endl; + return; + } + + if ( _file ){ +#ifdef _WIN32 + cout << "log rotation doesn't work on windows" << endl; + return; +#else + struct tm t; + localtime_r( &_opened , &t ); + + stringstream ss; + ss << _path << "." << terseCurrentTime(false); + string s = ss.str(); + rename( _path.c_str() , s.c_str() ); +#endif + } + + + FILE* tmp = fopen(_path.c_str(), (_append ? "a" : "w")); + if (!tmp){ + cerr << "can't open: " << _path.c_str() << " for log file" << endl; + dbexit( EXIT_BADOPTIONS ); + assert(0); + } + + Logstream::setLogFile(tmp); // after this point no thread will be using old file + + if (_file){ + fclose(_file); + } + + _file = tmp; + _opened = time(0); + } + + private: + + bool _enabled; + string _path; + bool _append; + + FILE * _file; + time_t _opened; + + } loggingManager; + + void initLogging( const string& lp , bool append ){ + cout << "all output going to: " << lp << endl; + loggingManager.start( lp , append ); + } + + void rotateLogs( int signal ){ + loggingManager.rotate(); + } + + // done *before* static initialization + FILE* Logstream::logfile = stdout; + +} + @@ -1,4 +1,4 @@ -// log.h +// @file log.h /* Copyright 2009 10gen Inc. * @@ -19,12 +19,33 @@ #include <string.h> #include <errno.h> +#include "../bson/util/builder.h" -namespace mongo { +#ifndef _WIN32 +//#include <syslog.h> +#endif - using boost::shared_ptr; +namespace mongo { - // Utility interface for stringifying object only when val() called. + enum LogLevel { LL_DEBUG , LL_INFO , LL_NOTICE , LL_WARNING , LL_ERROR , LL_SEVERE }; + + inline const char * logLevelToString( LogLevel l ){ + switch ( l ){ + case LL_DEBUG: + case LL_INFO: + case LL_NOTICE: + return ""; + case LL_WARNING: + return "warning" ; + case LL_ERROR: + return "ERROR"; + case LL_SEVERE: + return "SEVERE"; + default: + return "UNKNOWN"; + } + } + class LazyString { public: virtual ~LazyString() {} @@ -36,17 +57,29 @@ namespace mongo { class LazyStringImpl : public LazyString { public: LazyStringImpl( const T &t ) : t_( t ) {} - virtual string val() const { return (string)t_; } + virtual string val() const { return t_.toString(); } private: const T& t_; }; + class Tee { + public: + virtual ~Tee(){} + virtual void write(LogLevel level , const string& str) = 0; + }; + class Nullstream { public: + virtual Nullstream& operator<< (Tee* tee) { + return *this; + } virtual ~Nullstream() {} virtual Nullstream& operator<<(const char *) { return *this; } + virtual Nullstream& operator<<(const string& ) { + return *this; + } virtual Nullstream& operator<<(char *) { return *this; } @@ -111,50 +144,118 @@ namespace mongo { virtual Nullstream& operator<< (ios_base& (*hex)(ios_base&)) { return *this; } - virtual void flush(){} + virtual void flush(Tee *t = 0) {} }; extern Nullstream nullstream; -#define LOGIT { ss << x; return *this; } - class Logstream : public Nullstream { static mongo::mutex mutex; static int doneSetup; stringstream ss; + LogLevel logLevel; + static FILE* logfile; + static boost::scoped_ptr<ostream> stream; + static vector<Tee*> * globalTees; public: + + static void logLockless( const StringData& s ){ + if ( doneSetup == 1717 ){ + fwrite( s.data() , s.size() , 1 , logfile ); + fflush( logfile ); + } + else { + cout << s.data() << endl; + } + } + + static void setLogFile(FILE* f){ + scoped_lock lk(mutex); + logfile = f; + } + static int magicNumber(){ return 1717; } - void flush() { + + void flush(Tee *t = 0) { // this ensures things are sane - if ( doneSetup == 1717 ){ + if ( doneSetup == 1717 ) { + string msg = ss.str(); + string threadName = getThreadName(); + const char * type = logLevelToString(logLevel); + + int spaceNeeded = msg.size() + 64 + threadName.size(); + int bufSize = 128; + while ( bufSize < spaceNeeded ) + bufSize += 128; + + BufBuilder b(bufSize); + time_t_to_String( time(0) , b.grow(20) ); + if (!threadName.empty()){ + b.appendChar( '[' ); + b.appendStr( threadName , false ); + b.appendChar( ']' ); + b.appendChar( ' ' ); + } + if ( type[0] ){ + b.appendStr( type , false ); + b.appendStr( ": " , false ); + } + b.appendStr( msg ); + + string out( b.buf() , b.len() - 1); + scoped_lock lk(mutex); - cout << ss.str(); - cout.flush(); + + if( t ) t->write(logLevel,out); + if ( globalTees ){ + for ( unsigned i=0; i<globalTees->size(); i++ ) + (*globalTees)[i]->write(logLevel,out); + } + +#ifndef _WIN32 + //syslog( LOG_INFO , "%s" , cc ); +#endif + fwrite(out.data(), out.size(), 1, logfile); + fflush(logfile); } - ss.str(""); + _init(); } - Logstream& operator<<(const char *x) LOGIT - Logstream& operator<<(char *x) LOGIT - Logstream& operator<<(char x) LOGIT - Logstream& operator<<(int x) LOGIT - Logstream& operator<<(ExitCode x) LOGIT - Logstream& operator<<(long x) LOGIT - Logstream& operator<<(unsigned long x) LOGIT - Logstream& operator<<(unsigned x) LOGIT - Logstream& operator<<(double x) LOGIT - Logstream& operator<<(void *x) LOGIT - Logstream& operator<<(const void *x) LOGIT - Logstream& operator<<(long long x) LOGIT - Logstream& operator<<(unsigned long long x) LOGIT - Logstream& operator<<(bool x) LOGIT + + Nullstream& setLogLevel(LogLevel l){ + logLevel = l; + return *this; + } + + /** note these are virtual */ + Logstream& operator<<(const char *x) { ss << x; return *this; } + Logstream& operator<<(const string& x) { ss << x; return *this; } + Logstream& operator<<(char *x) { ss << x; return *this; } + Logstream& operator<<(char x) { ss << x; return *this; } + Logstream& operator<<(int x) { ss << x; return *this; } + Logstream& operator<<(ExitCode x) { ss << x; return *this; } + Logstream& operator<<(long x) { ss << x; return *this; } + Logstream& operator<<(unsigned long x) { ss << x; return *this; } + Logstream& operator<<(unsigned x) { ss << x; return *this; } + Logstream& operator<<(double x) { ss << x; return *this; } + Logstream& operator<<(void *x) { ss << x; return *this; } + Logstream& operator<<(const void *x) { ss << x; return *this; } + Logstream& operator<<(long long x) { ss << x; return *this; } + Logstream& operator<<(unsigned long long x) { ss << x; return *this; } + Logstream& operator<<(bool x) { ss << x; return *this; } + Logstream& operator<<(const LazyString& x) { ss << x.val(); return *this; } + Nullstream& operator<< (Tee* tee) { + ss << '\n'; + flush(tee); + return *this; + } Logstream& operator<< (ostream& ( *_endl )(ostream&)) { ss << '\n'; - flush(); + flush(0); return *this; } Logstream& operator<< (ios_base& (*_hex)(ios_base&)) { @@ -168,20 +269,29 @@ namespace mongo { if ( ! t ) *this << "null"; else - *this << t; + *this << *t; return *this; } Logstream& prolog() { - char now[64]; - time_t_to_String(time(0), now); - now[20] = 0; - ss << now; return *this; } + + void addGlobalTee( Tee * t ){ + if ( ! globalTees ) + globalTees = new vector<Tee*>(); + globalTees->push_back( t ); + } private: static thread_specific_ptr<Logstream> tsp; + Logstream(){ + _init(); + } + void _init(){ + ss.str(""); + logLevel = LL_INFO; + } public: static Logstream& get() { Logstream *p = tsp.get(); @@ -192,6 +302,7 @@ namespace mongo { }; extern int logLevel; + extern int tlogLevel; inline Nullstream& out( int level = 0 ) { if ( level > logLevel ) @@ -203,7 +314,7 @@ namespace mongo { at the specified level or higher. */ inline void logflush(int level = 0) { if( level > logLevel ) - Logstream::get().flush(); + Logstream::get().flush(0); } /* without prolog */ @@ -213,12 +324,29 @@ namespace mongo { return Logstream::get(); } - inline Nullstream& log( int level = 0 ) { + /** logging which we may not want during unit tests runs. + set tlogLevel to -1 to suppress tlog() output in a test program. */ + inline Nullstream& tlog( int level = 0 ) { + if ( level > tlogLevel || level > logLevel ) + return nullstream; + return Logstream::get().prolog(); + } + + inline Nullstream& log( int level ) { if ( level > logLevel ) return nullstream; return Logstream::get().prolog(); } + inline Nullstream& log( LogLevel l ) { + return Logstream::get().prolog().setLogLevel( l ); + } + + + inline Nullstream& log() { + return Logstream::get().prolog(); + } + /* TODOCONCURRENCY */ inline ostream& stdcout() { return cout; @@ -242,9 +370,52 @@ namespace mongo { void initLogging( const string& logpath , bool append ); void rotateLogs( int signal = 0 ); -#define OUTPUT_ERRNOX(x) "errno:" << x << " " << strerror(x) -#define OUTPUT_ERRNO OUTPUT_ERRNOX(errno) + std::string toUtf8String(const std::wstring& wide); + + inline string errnoWithDescription(int x = errno) { + stringstream s; + s << "errno:" << x << ' '; + +#if defined(_WIN32) + LPTSTR errorText = NULL;
+ FormatMessage(
+ FORMAT_MESSAGE_FROM_SYSTEM
+ |FORMAT_MESSAGE_ALLOCATE_BUFFER
+ |FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL,
+ x, 0,
+ (LPTSTR) &errorText, // output
+ 0, // minimum size for output buffer
+ NULL);
+ if( errorText ) {
+ string x = toUtf8String(errorText);
+ for( string::iterator i = x.begin(); i != x.end(); i++ ) {
+ if( *i == '\n' || *i == '\r' )
+ break;
+ s << *i;
+ }
+ LocalFree(errorText);
+ }
+ else
+ s << strerror(x);
+ /*
+ DWORD n = FormatMessage(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER |
+ FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, x,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR) &lpMsgBuf, 0, NULL);
+ */
+#else
+ s << strerror(x);
+#endif
+ return s.str();
+ } - string errnostring( const char * prefix = 0 ); + /** output the error # and error message with prefix. + handy for use as parm in uassert/massert. + */ + string errnoWithPrefix( const char * prefix = 0 ); } // namespace mongo diff --git a/util/lruishmap.h b/util/lruishmap.h index c390cb2..fe8b1dc 100644 --- a/util/lruishmap.h +++ b/util/lruishmap.h @@ -17,7 +17,7 @@ #pragma once -#include "../stdafx.h" +#include "../pch.h" #include "../util/goodies.h" namespace mongo { diff --git a/util/md5main.cpp b/util/md5main.cpp index d5b4982..9c56f91 100644 --- a/util/md5main.cpp +++ b/util/md5main.cpp @@ -40,7 +40,7 @@ 2002-04-13 lpd Splits off main program into a separate file, md5main.c. */ -#include "stdafx.h" +#include "pch.h" #include "md5.h" #include <math.h> #include <stdio.h> diff --git a/util/message.cpp b/util/message.cpp index 2c3d006..d7c13dc 100644 --- a/util/message.cpp +++ b/util/message.cpp @@ -18,7 +18,7 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "message.h" #include <time.h> #include "../util/goodies.h" @@ -26,10 +26,25 @@ #include <fcntl.h> #include <errno.h> #include "../db/cmdline.h" +#include "../client/dbclient.h" + +#ifndef _WIN32 +#include <sys/resource.h> +#else + +// errno doesn't work for winsock. +#undef errno +#define errno WSAGetLastError() + +#endif namespace mongo { + bool noUnixSocket = false; + bool objcheck = false; + + void checkTicketNumbers(); // if you want trace output: #define mmm(x) @@ -42,54 +57,165 @@ namespace mongo { const int portRecvFlags = 0; #endif - /* listener ------------------------------------------------------------------- */ + const Listener* Listener::_timeTracker; - bool Listener::init() { - SockAddr me; - if ( ip.empty() ) - me = SockAddr( port ); - else - me = SockAddr( ip.c_str(), port ); - sock = ::socket(AF_INET, SOCK_STREAM, 0); - if ( sock == INVALID_SOCKET ) { - log() << "ERROR: listen(): invalid socket? " << OUTPUT_ERRNO << endl; - return false; - } - prebindOptions( sock ); - if ( ::bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) { - log() << "listen(): bind() failed " << OUTPUT_ERRNO << " for port: " << port << endl; - closesocket(sock); - return false; + vector<SockAddr> ipToAddrs(const char* ips, int port){ + vector<SockAddr> out; + if (*ips == '\0'){ + out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all + + if (IPv6Enabled()) + out.push_back(SockAddr("::", port)); // IPv6 all +#ifndef _WIN32 + if (!noUnixSocket) + out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket +#endif + return out; } - if ( ::listen(sock, 128) != 0 ) { - log() << "listen(): listen() failed " << OUTPUT_ERRNO << endl; - closesocket(sock); - return false; + while(*ips){ + string ip; + const char * comma = strchr(ips, ','); + if (comma){ + ip = string(ips, comma - ips); + ips = comma + 1; + }else{ + ip = string(ips); + ips = ""; + } + + SockAddr sa(ip.c_str(), port); + out.push_back(sa); + +#ifndef _WIN32 + if (!noUnixSocket && (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4 + out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); +#endif } - - return true; + return out; + } - void Listener::listen() { + /* listener ------------------------------------------------------------------- */ + + void Listener::initAndListen() { + checkTicketNumbers(); + vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port); + vector<int> socks; + SOCKET maxfd = 0; // needed for select() + + for (vector<SockAddr>::iterator it=mine.begin(), end=mine.end(); it != end; ++it){ + SockAddr& me = *it; + + SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0); + if ( sock == INVALID_SOCKET ) { + log() << "ERROR: listen(): invalid socket? " << errnoWithDescription() << endl; + } + + if (me.getType() == AF_UNIX){ +#if !defined(_WIN32) + if (unlink(me.getAddr().c_str()) == -1){ + int x = errno; + if (x != ENOENT){ + log() << "couldn't unlink socket file " << me << errnoWithDescription(x) << " skipping" << endl; + continue; + } + } +#endif + } else if (me.getType() == AF_INET6) { + // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1) + // That causes a conflict if we don't do set it to IPV6_ONLY + const int one = 1; + setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &one, sizeof(one)); + } + + prebindOptions( sock ); + + if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) { + int x = errno; + log() << "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl; + if ( x == EADDRINUSE ) + log() << " addr already in use" << endl; + closesocket(sock); + return; + } + + if ( ::listen(sock, 128) != 0 ) { + log() << "listen(): listen() failed " << errnoWithDescription() << endl; + closesocket(sock); + return; + } + + ListeningSockets::get()->add( sock ); + + socks.push_back(sock); + if (sock > maxfd) + maxfd = sock; + } + static long connNumber = 0; - SockAddr from; + struct timeval maxSelectTime; while ( ! inShutdown() ) { - int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); - if ( s < 0 ) { - if ( errno == ECONNABORTED || errno == EBADF ) { - log() << "Listener on port " << port << " aborted" << endl; - return; - } - log() << "Listener: accept() returns " << s << " " << OUTPUT_ERRNO << endl; + fd_set fds[1]; + FD_ZERO(fds); + + for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it){ + FD_SET(*it, fds); + } + + maxSelectTime.tv_sec = 0; + maxSelectTime.tv_usec = 10000; + const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime); + + if (ret == 0){ + _elapsedTime += ( 10000 - maxSelectTime.tv_usec ) / 1000; continue; } - disableNagle(s); - if ( ! cmdLine.quiet ) log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl; - accepted( new MessagingPort(s, from) ); + _elapsedTime += ret; // assume 1ms to grab connection. very rough + + if (ret < 0){ + int x = errno; +#ifdef EINTR + if ( x == EINTR ){ + log() << "select() signal caught, continuing" << endl; + continue; + } +#endif + if ( ! inShutdown() ) + log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl; + return; + } + + for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it){ + if (! (FD_ISSET(*it, fds))) + continue; + + SockAddr from; + int s = accept(*it, from.raw(), &from.addressSize); + if ( s < 0 ) { + int x = errno; // so no global issues + if ( x == ECONNABORTED || x == EBADF ) { + log() << "Listener on port " << _port << " aborted" << endl; + return; + } if ( x == 0 && inShutdown() ){ + return; // socket closed + } + log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl; + continue; + } + if (from.getType() != AF_UNIX) + disableNagle(s); + if ( _logConnect && ! cmdLine.quiet ) + log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl; + accepted(s, from); + } } } + void Listener::accepted(int sock, const SockAddr& from){ + accepted( new MessagingPort(sock, from) ); + } + /* messagingport -------------------------------------------------------------- */ class PiggyBackData { @@ -101,27 +227,28 @@ namespace mongo { } ~PiggyBackData() { - flush(); - delete( _cur ); + DESTRUCTOR_GUARD ( + flush(); + delete[]( _cur ); + ); } void append( Message& m ) { - assert( m.data->len <= 1300 ); + assert( m.header()->len <= 1300 ); - if ( len() + m.data->len > 1300 ) + if ( len() + m.header()->len > 1300 ) flush(); - memcpy( _cur , m.data , m.data->len ); - _cur += m.data->len; + memcpy( _cur , m.singleData() , m.header()->len ); + _cur += m.header()->len; } - int flush() { + void flush() { if ( _buf == _cur ) - return 0; + return; - int x = _port->send( _buf , len() ); + _port->send( _buf , len(), "flush" ); _cur = _buf; - return x; } int len() { @@ -137,12 +264,10 @@ namespace mongo { }; class Ports { - set<MessagingPort*>& ports; + set<MessagingPort*> ports; mongo::mutex m; public: - // we "new" this so it is still be around when other automatic global vars - // are being destructed during termination. - Ports() : ports( *(new set<MessagingPort*>()) ) {} + Ports() : ports(), m("Ports") {} void closeAll() { \ scoped_lock bl(m); for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) @@ -156,7 +281,11 @@ namespace mongo { scoped_lock bl(m); ports.erase(p); } - } ports; + }; + + // we "new" this so it is still be around when other automatic global vars + // are being destructed during termination. + Ports& ports = *(new Ports()); @@ -164,14 +293,17 @@ namespace mongo { ports.closeAll(); } - MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far) { + MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far), _timeout() { + _logLevel = 0; ports.insert(this); } - MessagingPort::MessagingPort() { + MessagingPort::MessagingPort( int timeout, int ll ) { + _logLevel = ll; ports.insert(this); sock = -1; piggyBackData = 0; + _timeout = timeout; } void MessagingPort::shutdown() { @@ -194,42 +326,30 @@ namespace mongo { int res; SockAddr farEnd; void run() { - res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); + res = ::connect(sock, farEnd.raw(), farEnd.addressSize); } + string name() { return "ConnectBG"; } }; bool MessagingPort::connect(SockAddr& _far) { farEnd = _far; - sock = socket(AF_INET, SOCK_STREAM, 0); + sock = socket(farEnd.getType(), SOCK_STREAM, 0); if ( sock == INVALID_SOCKET ) { - log() << "ERROR: connect(): invalid socket? " << OUTPUT_ERRNO << endl; + log(_logLevel) << "ERROR: connect(): invalid socket? " << errnoWithDescription() << endl; return false; } -#if 0 - long fl = fcntl(sock, F_GETFL, 0); - assert( fl >= 0 ); - fl |= O_NONBLOCK; - fcntl(sock, F_SETFL, fl); - - int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); - if ( res ) { - if ( errno == EINPROGRESS ) - closesocket(sock); - sock = -1; - return false; + if ( _timeout > 0 ) { + setSockTimeouts( sock, _timeout ); } - -#endif - + ConnectBG bg; bg.sock = sock; bg.farEnd = farEnd; bg.go(); - // int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); if ( bg.wait(5000) ) { if ( bg.res ) { closesocket(sock); @@ -245,7 +365,8 @@ namespace mongo { return false; } - disableNagle(sock); + if (farEnd.getType() != AF_UNIX) + disableNagle(sock); #ifdef SO_NOSIGPIPE // osx @@ -257,94 +378,65 @@ namespace mongo { } bool MessagingPort::recv(Message& m) { -again: - mmm( out() << "* recv() sock:" << this->sock << endl; ) - int len = -1; - - char *lenbuf = (char *) &len; - int lft = 4; - while ( 1 ) { - int x = recv( lenbuf, lft ); - if ( x == 0 ) { - DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; - m.reset(); - return false; - } - if ( x < 0 ) { - log() << "MessagingPort recv() " << OUTPUT_ERRNO << " " << farEnd.toString()<<endl; - m.reset(); - return false; - } - lft -= x; - if ( lft == 0 ) - break; - lenbuf += x; - log() << "MessagingPort recv() got " << x << " bytes wanted 4, lft=" << lft << endl; - assert( lft > 0 ); - } - - if ( len < 0 || len > 16000000 ) { - if ( len == -1 ) { - // Endian check from the database, after connecting, to see what mode server is running in. - unsigned foo = 0x10203040; - int x = send( (char *) &foo, 4 ); - if ( x <= 0 ) { - log() << "MessagingPort endian send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; + try { + again: + mmm( log() << "* recv() sock:" << this->sock << endl; ) + int len = -1; + + char *lenbuf = (char *) &len; + int lft = 4; + recv( lenbuf, lft ); + + if ( len < 16 || len > 16000000 ) { // messages must be large enough for headers + if ( len == -1 ) { + // Endian check from the database, after connecting, to see what mode server is running in. + unsigned foo = 0x10203040; + send( (char *) &foo, 4, "endian" ); + goto again; + } + + if ( len == 542393671 ){ + // an http GET + log(_logLevel) << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl; + string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n"; + stringstream ss; + ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; + string s = ss.str(); + send( s.c_str(), s.size(), "http" ); return false; } - goto again; - } - - if ( len == 542393671 ){ - // an http GET - log() << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl; - string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n"; - stringstream ss; - ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; - string s = ss.str(); - send( s.c_str(), s.size() ); - return false; - } - log() << "bad recv() len: " << len << '\n'; - return false; - } - - int z = (len+1023)&0xfffffc00; - assert(z>=len); - MsgData *md = (MsgData *) malloc(z); - md->len = len; - - if ( len <= 0 ) { - out() << "got a length of " << len << ", something is wrong" << endl; - return false; - } - - char *p = (char *) &md->id; - int left = len -4; - while ( 1 ) { - int x = recv( p, left ); - if ( x == 0 ) { - DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; - m.reset(); + log(_logLevel) << "bad recv() len: " << len << '\n'; return false; } - if ( x < 0 ) { - log() << "MessagingPort recv() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; - m.reset(); - return false; + + int z = (len+1023)&0xfffffc00; + assert(z>=len); + MsgData *md = (MsgData *) malloc(z); + assert(md); + md->len = len; + + char *p = (char *) &md->id; + int left = len -4; + + try { + recv( p, left ); + } catch (...) { + free(md); + throw; } - left -= x; - p += x; - if ( left <= 0 ) - break; + + m.setData(md, true); + return true; + + } catch ( const SocketException & e ) { + log(_logLevel + (e.shouldPrint() ? 0 : 1) ) << "SocketException: " << e << endl; + m.reset(); + return false; } - - m.setData(md, true); - return true; } - + void MessagingPort::reply(Message& received, Message& response) { - say(/*received.from, */response, received.data->id); + say(/*received.from, */response, received.header()->id); } void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) { @@ -352,79 +444,171 @@ again: } bool MessagingPort::call(Message& toSend, Message& response) { - mmm( out() << "*call()" << endl; ) - MSGID old = toSend.data->id; + mmm( log() << "*call()" << endl; ) + MSGID old = toSend.header()->id; say(/*to,*/ toSend); while ( 1 ) { bool ok = recv(response); if ( !ok ) return false; - //out() << "got response: " << response.data->responseTo << endl; - if ( response.data->responseTo == toSend.data->id ) + //log() << "got response: " << response.data->responseTo << endl; + if ( response.header()->responseTo == toSend.header()->id ) break; - out() << "********************" << endl; - out() << "ERROR: MessagingPort::call() wrong id got:" << (unsigned)response.data->responseTo << " expect:" << (unsigned)toSend.data->id << endl; - out() << " toSend op: " << toSend.data->operation() << " old id:" << (unsigned)old << endl; - out() << " response msgid:" << (unsigned)response.data->id << endl; - out() << " response len: " << (unsigned)response.data->len << endl; - out() << " response op: " << response.data->operation() << endl; - out() << " farEnd: " << farEnd << endl; + log() << "********************" << endl; + log() << "ERROR: MessagingPort::call() wrong id got:" << hex << (unsigned)response.header()->responseTo << " expect:" << (unsigned)toSend.header()->id << endl; + log() << " toSend op: " << toSend.operation() << " old id:" << (unsigned)old << endl; + log() << " response msgid:" << (unsigned)response.header()->id << endl; + log() << " response len: " << (unsigned)response.header()->len << endl; + log() << " response op: " << response.operation() << endl; + log() << " farEnd: " << farEnd << endl; assert(false); response.reset(); } - mmm( out() << "*call() end" << endl; ) + mmm( log() << "*call() end" << endl; ) return true; } void MessagingPort::say(Message& toSend, int responseTo) { - assert( toSend.data ); - mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) - toSend.data->id = nextMessageId(); - toSend.data->responseTo = responseTo; - - int x = -100; + assert( !toSend.empty() ); + mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) + toSend.header()->id = nextMessageId(); + toSend.header()->responseTo = responseTo; if ( piggyBackData && piggyBackData->len() ) { - mmm( out() << "* have piggy back" << endl; ) - if ( ( piggyBackData->len() + toSend.data->len ) > 1300 ) { + mmm( log() << "* have piggy back" << endl; ) + if ( ( piggyBackData->len() + toSend.header()->len ) > 1300 ) { // won't fit in a packet - so just send it off piggyBackData->flush(); } else { piggyBackData->append( toSend ); - x = piggyBackData->flush(); + piggyBackData->flush(); + return; } } - if ( x == -100 ) - x = send( (char*)toSend.data, toSend.data->len ); - - if ( x <= 0 ) { - log() << "MessagingPort say send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; - throw SocketException(); - } - + toSend.send( *this, "say" ); } - int MessagingPort::send( const char * data , const int len ){ - return ::send( sock , data , len , portSendFlags ); + // sends all data or throws an exception + void MessagingPort::send( const char * data , int len, const char *context ) { + while( len > 0 ) { + int ret = ::send( sock , data , len , portSendFlags ); + if ( ret == -1 ) { + if ( errno != EAGAIN || _timeout == 0 ) { + log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl; + throw SocketException( SocketException::SEND_ERROR ); + } else { + if ( !serverAlive( farEnd.toString() ) ) { + log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl; + throw SocketException( SocketException::SEND_ERROR ); + } + } + } else { + assert( ret <= len ); + len -= ret; + data += ret; + } + } } - int MessagingPort::recv( char * buf , int max ){ - return ::recv( sock , buf , max , portRecvFlags ); + // sends all data or throws an exception + void MessagingPort::send( const vector< pair< char *, int > > &data, const char *context ){ +#if defined(_WIN32) + // TODO use scatter/gather api + for( vector< pair< char *, int > >::const_iterator i = data.begin(); i != data.end(); ++i ) { + char * data = i->first; + int len = i->second; + send( data, len, context ); + } +#else + vector< struct iovec > d( data.size() ); + int i = 0; + for( vector< pair< char *, int > >::const_iterator j = data.begin(); j != data.end(); ++j ) { + if ( j->second > 0 ) { + d[ i ].iov_base = j->first; + d[ i ].iov_len = j->second; + ++i; + } + } + struct msghdr meta; + memset( &meta, 0, sizeof( meta ) ); + meta.msg_iov = &d[ 0 ]; + meta.msg_iovlen = d.size(); + + while( meta.msg_iovlen > 0 ) { + int ret = ::sendmsg( sock , &meta , portSendFlags ); + if ( ret == -1 ) { + if ( errno != EAGAIN || _timeout == 0 ) { + log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl; + throw SocketException( SocketException::SEND_ERROR ); + } else { + if ( !serverAlive( farEnd.toString() ) ) { + log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl; + throw SocketException( SocketException::SEND_ERROR ); + } + } + } else { + struct iovec *& i = meta.msg_iov; + while( ret > 0 ) { + if ( i->iov_len > unsigned( ret ) ) { + i->iov_len -= ret; + i->iov_base = (char*)(i->iov_base) + ret; + ret = 0; + } else { + ret -= i->iov_len; + ++i; + --(meta.msg_iovlen); + } + } + } + } +#endif } + void MessagingPort::recv( char * buf , int len ){ + while( len > 0 ) { + int ret = ::recv( sock , buf , len , portRecvFlags ); + if ( ret == 0 ) { + log(3) << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; + throw SocketException( SocketException::CLOSED ); + } + if ( ret == -1 ) { + int e = errno; + if ( e != EAGAIN || _timeout == 0 ) { + log(_logLevel) << "MessagingPort recv() " << errnoWithDescription(e) << " " << farEnd.toString() <<endl; + throw SocketException( SocketException::RECV_ERROR ); + } else { + if ( !serverAlive( farEnd.toString() ) ) { + log(_logLevel) << "MessagingPort recv() remote dead " << farEnd.toString() << endl; + throw SocketException( SocketException::RECV_ERROR ); + } + } + } else { + if ( len <= 4 && ret != len ) + log(_logLevel) << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl; + assert( ret <= len ); + len -= ret; + buf += ret; + } + } + } + + int MessagingPort::unsafe_recv( char *buf, int max ) { + return ::recv( sock , buf , max , portRecvFlags ); + } + void MessagingPort::piggyBack( Message& toSend , int responseTo ) { - if ( toSend.data->len > 1300 ) { + if ( toSend.header()->len > 1300 ) { // not worth saving because its almost an entire packet say( toSend ); return; } // we're going to be storing this, so need to set it up - toSend.data->id = nextMessageId(); - toSend.data->responseTo = responseTo; + toSend.header()->id = nextMessageId(); + toSend.header()->responseTo = responseTo; if ( ! piggyBackData ) piggyBackData = new PiggyBackData( this ); @@ -432,10 +616,15 @@ again: piggyBackData->append( toSend ); } - unsigned MessagingPort::remotePort(){ + unsigned MessagingPort::remotePort() const { return farEnd.getPort(); } + HostAndPort MessagingPort::remote() const { + return farEnd; + } + + MSGID NextMsgId; bool usingClientIds = 0; ThreadLocalValue<int> clientId; @@ -473,4 +662,62 @@ again: return clientId.get(); } + int getMaxConnections(){ +#ifdef _WIN32 + return 20000; +#else + struct rlimit limit; + assert( getrlimit(RLIMIT_NOFILE,&limit) == 0 ); + + int max = (int)(limit.rlim_cur * .8); + + log(1) << "fd limit" + << " hard:" << limit.rlim_max + << " soft:" << limit.rlim_cur + << " max conn: " << max + << endl; + + if ( max > 20000 ) + max = 20000; + + return max; +#endif + } + + void checkTicketNumbers(){ + connTicketHolder.resize( getMaxConnections() ); + } + + TicketHolder connTicketHolder(20000); + + namespace { + map<string, bool> isSelfCache; // host, isSelf + } + + bool HostAndPort::isSelf() const { + int p = _port == -1 ? CmdLine::DefaultDBPort : _port; + + if( p != cmdLine.port ){ + return false; + } else if (sameHostname(getHostName(), _host) || isLocalHost()) { + return true; + } else { + map<string, bool>::const_iterator it = isSelfCache.find(_host); + if (it != isSelfCache.end()){ + return it->second; + } + + SockAddr addr (_host.c_str(), 0); // port 0 is dynamically assigned + SOCKET sock = ::socket(addr.getType(), SOCK_STREAM, 0); + assert(sock != INVALID_SOCKET); + + bool ret = (::bind(sock, addr.raw(), addr.addressSize) == 0); + isSelfCache[_host] = ret; + + closesocket(sock); + + return ret; + } + } + } // namespace mongo diff --git a/util/message.h b/util/message.h index 5dccaef..13f31df 100644 --- a/util/message.h +++ b/util/message.h @@ -1,4 +1,4 @@ -// message.h +// Message.h /* Copyright 2009 10gen Inc. * @@ -18,10 +18,13 @@ #pragma once #include "../util/sock.h" -#include "../util/atomic_int.h" +#include "../bson/util/atomic_int.h" +#include "hostandport.h" namespace mongo { + extern bool noUnixSocket; + class Message; class MessagingPort; class PiggyBackData; @@ -29,18 +32,46 @@ namespace mongo { class Listener { public: - Listener(const string &_ip, int p) : ip(_ip), port(p) { } - virtual ~Listener() {} - bool init(); // set up socket - int socket() const { return sock; } - void listen(); // never returns (start a thread) + Listener(const string &ip, int p, bool logConnect=true ) : _port(p), _ip(ip), _logConnect(logConnect), _elapsedTime(0){ } + virtual ~Listener() { + if ( _timeTracker == this ) + _timeTracker = 0; + } + void initAndListen(); // never returns unless error (start a thread) /* spawn a thread, etc., then return */ - virtual void accepted(MessagingPort *mp) = 0; + virtual void accepted(int sock, const SockAddr& from); + virtual void accepted(MessagingPort *mp){ + assert(!"You must overwrite one of the accepted methods"); + } + + const int _port; + + /** + * @return a rough estimate of elepased time since the server started + */ + long long getMyElapsedTimeMillis() const { return _elapsedTime; } + + void setAsTimeTracker(){ + _timeTracker = this; + } + + static const Listener* getTimeTracker(){ + return _timeTracker; + } + + static long long getElapsedTimeMillis() { + if ( _timeTracker ) + return _timeTracker->getMyElapsedTimeMillis(); + return 0; + } + private: - string ip; - int port; - int sock; + string _ip; + bool _logConnect; + long long _elapsedTime; + + static const Listener* _timeTracker; }; class AbstractMessagingPort { @@ -49,13 +80,25 @@ namespace mongo { virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available virtual void reply(Message& received, Message& response) = 0; - virtual unsigned remotePort() = 0 ; + virtual HostAndPort remote() const = 0; + virtual unsigned remotePort() const = 0; + + virtual int getClientId(){ + int x = remotePort(); + x = x << 16; + return x; + } }; class MessagingPort : public AbstractMessagingPort { public: - MessagingPort(int sock, SockAddr& farEnd); - MessagingPort(); + MessagingPort(int sock, const SockAddr& farEnd); + + // in some cases the timeout will actually be 2x this value - eg we do a partial send, + // then the timeout fires, then we try to send again, then the timeout fires again with + // no data sent, then we detect that the other side is down + MessagingPort(int timeout = 0, int logLevel = 0 ); + virtual ~MessagingPort(); void shutdown(); @@ -73,22 +116,28 @@ namespace mongo { void piggyBack( Message& toSend , int responseTo = -1 ); - virtual unsigned remotePort(); + virtual unsigned remotePort() const; + virtual HostAndPort remote() const; + + // send len or throw SocketException + void send( const char * data , int len, const char *context ); + void send( const vector< pair< char *, int > > &data, const char *context ); - int send( const char * data , const int len ); - int recv( char * data , int max ); + // recv len or throw SocketException + void recv( char * data , int len ); + + int unsafe_recv( char *buf, int max ); private: int sock; PiggyBackData * piggyBackData; public: SockAddr farEnd; + int _timeout; + int _logLevel; // passed to log() when logging errors friend class PiggyBackData; }; - //#pragma pack() -#pragma pack(1) - enum Operations { opReply = 1, /* reply. responseTo is set. */ dbMsg = 1000, /* generic msg command followed by a string */ @@ -121,6 +170,27 @@ namespace mongo { } } +#pragma pack(1) +/* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol +*/ +struct MSGHEADER { + int messageLength; // total message size, including this + int requestID; // identifier for this message + int responseTo; // requestID from the original request + // (used in reponses from db) + int opCode; +}; +struct OP_GETMORE : public MSGHEADER { + MSGHEADER header; // standard message header + int ZERO_or_flags; // 0 - reserved for future use + //cstring fullCollectionName; // "dbname.collectionname" + //int32 numberToReturn; // number of documents to return + //int64 cursorID; // cursorID from the OP_REPLY +}; +#pragma pack() + +#pragma pack(1) + /* todo merge this with MSGHEADER (or inherit from it). */ struct MsgData { int len; /* len of the msg, including this field */ MSGID id; /* request/reply id's match... */ @@ -146,88 +216,232 @@ namespace mongo { return true; } + long long getCursor(){ + assert( responseTo > 0 ); + assert( _operation == opReply ); + long long * l = (long long *)(_data + 4); + return l[0]; + } + int dataLen(); // len without header }; const int MsgDataHeaderSize = sizeof(MsgData) - 4; inline int MsgData::dataLen() { return len - MsgDataHeaderSize; } - #pragma pack() class Message { public: - Message() { - data = 0; - freeIt = false; - } - Message( void * _data , bool _freeIt ) { - data = (MsgData*)_data; - freeIt = _freeIt; + // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit). + Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {} + Message( void * data , bool freeIt ) : + _buf( 0 ), _data( 0 ), _freeIt( false ) { + _setData( reinterpret_cast< MsgData* >( data ), freeIt ); }; + Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) { + *this = r; + } ~Message() { reset(); } - SockAddr from; - MsgData *data; + SockAddr _from; - int operation() const { - return data->operation(); + MsgData *header() const { + assert( !empty() ); + return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first ); + } + int operation() const { return header()->operation(); } + + MsgData *singleData() const { + massert( 13273, "single data buffer expected", _buf ); + return header(); } + bool empty() const { return !_buf && _data.empty(); } + + int size() const{ + int res = 0; + if ( _buf ){ + res = _buf->len; + } else { + for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it){ + res += it->second; + } + } + return res; + } + + // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy + // can get rid of this if we make response handling smarter + void concat() { + if ( _buf || empty() ) { + return; + } + + assert( _freeIt ); + int totalSize = 0; + for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { + totalSize += i->second; + } + char *buf = (char*)malloc( totalSize ); + char *p = buf; + for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { + memcpy( p, i->first, i->second ); + p += i->second; + } + reset(); + _setData( (MsgData*)buf, true ); + } + + // vector swap() so this is fast Message& operator=(Message& r) { - assert( data == 0 ); - data = r.data; - assert( r.freeIt ); - r.freeIt = false; - r.data = 0; - freeIt = true; + assert( empty() ); + assert( r._freeIt ); + _buf = r._buf; + r._buf = 0; + if ( r._data.size() > 0 ) { + _data.swap( r._data ); + } + r._freeIt = false; + _freeIt = true; return *this; } void reset() { - if ( freeIt && data ) - free(data); - data = 0; - freeIt = false; + if ( _freeIt ) { + if ( _buf ) { + free( _buf ); + } + for( vector< pair< char *, int > >::const_iterator i = _data.begin(); i != _data.end(); ++i ) { + free(i->first); + } + } + _buf = 0; + _data.clear(); + _freeIt = false; } - void setData(MsgData *d, bool _freeIt) { - assert( data == 0 ); - freeIt = _freeIt; - data = d; + // use to add a buffer + // assumes message will free everything + void appendData(char *d, int size) { + if ( size <= 0 ) { + return; + } + if ( empty() ) { + MsgData *md = (MsgData*)d; + md->len = size; // can be updated later if more buffers added + _setData( md, true ); + return; + } + assert( _freeIt ); + if ( _buf ) { + _data.push_back( make_pair( (char*)_buf, _buf->len ) ); + _buf = 0; + } + _data.push_back( make_pair( d, size ) ); + header()->len += size; + } + + // use to set first buffer if empty + void setData(MsgData *d, bool freeIt) { + assert( empty() ); + _setData( d, freeIt ); } void setData(int operation, const char *msgtxt) { setData(operation, msgtxt, strlen(msgtxt)+1); } void setData(int operation, const char *msgdata, size_t len) { - assert(data == 0); + assert( empty() ); size_t dataLen = len + sizeof(MsgData) - 4; MsgData *d = (MsgData *) malloc(dataLen); memcpy(d->_data, msgdata, len); d->len = fixEndian(dataLen); d->setOperation(operation); - freeIt= true; - data = d; + _setData( d, true ); } bool doIFreeIt() { - return freeIt; + return _freeIt; + } + + void send( MessagingPort &p, const char *context ) { + if ( empty() ) { + return; + } + if ( _buf != 0 ) { + p.send( (char*)_buf, _buf->len, context ); + } else { + p.send( _data, context ); + } } private: - bool freeIt; + void _setData( MsgData *d, bool freeIt ) { + _freeIt = freeIt; + _buf = d; + } + // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data + MsgData * _buf; + // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead + typedef vector< pair< char*, int > > MsgVec; + MsgVec _data; + bool _freeIt; }; class SocketException : public DBException { public: - virtual const char* what() const throw() { return "socket exception"; } - virtual int getCode(){ return 9001; } + enum Type { CLOSED , RECV_ERROR , SEND_ERROR } type; + SocketException( Type t ) : DBException( "socket exception" , 9001 ) , type(t){} + + bool shouldPrint() const { + return type != CLOSED; + } + }; MSGID nextMessageId(); void setClientId( int id ); int getClientId(); + + extern TicketHolder connTicketHolder; + + class ElapsedTracker { + public: + ElapsedTracker( int hitsBetweenMarks , int msBetweenMarks ) + : _h( hitsBetweenMarks ) , _ms( msBetweenMarks ) , _pings(0){ + _last = Listener::getElapsedTimeMillis(); + } + + /** + * call this for every iteration + * returns true if one of the triggers has gone off + */ + bool ping(){ + if ( ( ++_pings % _h ) == 0 ){ + _last = Listener::getElapsedTimeMillis(); + return true; + } + + long long now = Listener::getElapsedTimeMillis(); + if ( now - _last > _ms ){ + _last = now; + return true; + } + + return false; + } + + private: + int _h; + int _ms; + + unsigned long long _pings; + + long long _last; + + }; + } // namespace mongo diff --git a/util/message_server.h b/util/message_server.h index cc40b76..9d6a8f2 100644 --- a/util/message_server.h +++ b/util/message_server.h @@ -22,7 +22,7 @@ #pragma once -#include "../stdafx.h" +#include "../pch.h" namespace mongo { @@ -30,20 +30,23 @@ namespace mongo { public: virtual ~MessageHandler(){} virtual void process( Message& m , AbstractMessagingPort* p ) = 0; + virtual void disconnected( AbstractMessagingPort* p ) = 0; }; - + class MessageServer { public: - MessageServer( int port , MessageHandler * handler ) : _port( port ) , _handler( handler ){} - virtual ~MessageServer(){} + struct Options { + int port; // port to bind to + string ipList; // addresses to bind to + Options() : port(0), ipList(""){} + }; + + virtual ~MessageServer(){} virtual void run() = 0; - - protected: - - int _port; - MessageHandler* _handler; + virtual void setAsTimeTracker() = 0; }; - MessageServer * createServer( int port , MessageHandler * handler ); + // TODO use a factory here to decide between port and asio variations + MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ); } diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp index 7fca29a..0c9479c 100644 --- a/util/message_server_asio.cpp +++ b/util/message_server_asio.cpp @@ -27,7 +27,7 @@ #include "message.h" #include "message_server.h" -#include "../util/mvar.h" +#include "../util/concurrency/mvar.h" using namespace boost; using namespace boost::asio; @@ -204,9 +204,11 @@ namespace mongo { class AsyncMessageServer : public MessageServer { public: - AsyncMessageServer( int port , MessageHandler * handler ) - : MessageServer( port , handler ) - , _endpoint( tcp::v4() , port ) + // TODO accept an IP address to bind to + AsyncMessageServer( const MessageServer::Options& opts , MessageHandler * handler ) + : _port( opts.port ) + , _handler(handler) + , _endpoint( tcp::v4() , opts.port ) , _acceptor( _ioservice , _endpoint ) { _accept(); @@ -232,7 +234,7 @@ namespace mongo { _accept(); } - void _accept(){ + void _accept( ){ shared_ptr<MessageServerSession> session( new MessageServerSession( _handler , _ioservice ) ); _acceptor.async_accept( session->socket() , boost::bind( &AsyncMessageServer::handleAccept, @@ -243,13 +245,15 @@ namespace mongo { } private: + int _port; + MessageHandler * _handler; io_service _ioservice; tcp::endpoint _endpoint; tcp::acceptor _acceptor; }; - MessageServer * createServer( int port , MessageHandler * handler ){ - return new AsyncMessageServer( port , handler ); + MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ){ + return new AsyncMessageServer( opts , handler ); } } diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp index 2350ec2..9649e45 100644 --- a/util/message_server_port.cpp +++ b/util/message_server_port.cpp @@ -15,32 +15,41 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #ifndef USE_ASIO #include "message.h" #include "message_server.h" +#include "../db/cmdline.h" + namespace mongo { namespace pms { - MessagingPort * grab = 0; MessageHandler * handler; - - void threadRun(){ - assert( grab ); - auto_ptr<MessagingPort> p( grab ); - grab = 0; + + void threadRun( MessagingPort * inPort){ + assert( inPort ); + setThreadName( "conn" ); + TicketHolderReleaser connTicketReleaser( &connTicketHolder ); + + auto_ptr<MessagingPort> p( inPort ); + + string otherSide; + Message m; try { + otherSide = p->farEnd.toString(); + while ( 1 ){ m.reset(); if ( ! p->recv(m) ) { - log() << "end connection " << p->farEnd.toString() << endl; + if( !cmdLine.quiet ) + log() << "end connection " << otherSide << endl; p->shutdown(); break; } @@ -48,42 +57,69 @@ namespace mongo { handler->process( m , p.get() ); } } + catch ( const SocketException& ){ + log() << "unclean socket shutdown from: " << otherSide << endl; + } + catch ( const std::exception& e ){ + problem() << "uncaught exception (" << e.what() << ")(" << demangleName( typeid(e) ) <<") in PortMessageServer::threadRun, closing connection" << endl; + } catch ( ... ){ problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl; } + handler->disconnected( p.get() ); } } class PortMessageServer : public MessageServer , public Listener { public: - PortMessageServer( int port , MessageHandler * handler ) : - MessageServer( port , handler ) , - Listener( "", port ){ + PortMessageServer( const MessageServer::Options& opts, MessageHandler * handler ) : + Listener( opts.ipList, opts.port ){ uassert( 10275 , "multiple PortMessageServer not supported" , ! pms::handler ); pms::handler = handler; } virtual void accepted(MessagingPort * p) { - assert( ! pms::grab ); - pms::grab = p; - boost::thread thr( pms::threadRun ); - while ( pms::grab ) - sleepmillis(1); + + if ( ! connTicketHolder.tryAcquire() ){ + log() << "connection refused because too many open connections: " << connTicketHolder.used() << endl; + + // TODO: would be nice if we notified them... + p->shutdown(); + delete p; + + sleepmillis(2); // otherwise we'll hard loop + return; + } + + try { + boost::thread thr( boost::bind( &pms::threadRun , p ) ); + } + catch ( boost::thread_resource_error& ){ + log() << "can't create new thread, closing connection" << endl; + + p->shutdown(); + delete p; + + sleepmillis(2); + } } + virtual void setAsTimeTracker(){ + Listener::setAsTimeTracker(); + } + void run(){ - assert( init() ); - listen(); + initAndListen(); } }; - MessageServer * createServer( int port , MessageHandler * handler ){ - return new PortMessageServer( port , handler ); + MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ){ + return new PortMessageServer( opts , handler ); } } diff --git a/util/miniwebserver.cpp b/util/miniwebserver.cpp index 61619d8..0193c5d 100644 --- a/util/miniwebserver.cpp +++ b/util/miniwebserver.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "miniwebserver.h" #include "hex.h" @@ -23,52 +23,22 @@ namespace mongo { - MiniWebServer::MiniWebServer() { - sock = 0; - } - - bool MiniWebServer::init(const string &ip, int _port) { - port = _port; - SockAddr me; - if ( ip.empty() ) - me = SockAddr( port ); - else - me = SockAddr( ip.c_str(), port ); - sock = ::socket(AF_INET, SOCK_STREAM, 0); - if ( sock == INVALID_SOCKET ) { - log() << "ERROR: MiniWebServer listen(): invalid socket? " << OUTPUT_ERRNO << endl; - return false; - } - prebindOptions( sock ); - if ( ::bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) { - log() << "MiniWebServer: bind() failed port:" << port << " " << OUTPUT_ERRNO << endl; - if ( errno == EADDRINUSE ) - log() << " addr already in use" << endl; - closesocket(sock); - return false; - } - - if ( ::listen(sock, 16) != 0 ) { - log() << "MiniWebServer: listen() failed " << OUTPUT_ERRNO << endl; - closesocket(sock); - return false; - } - - return true; - } + MiniWebServer::MiniWebServer(const string &ip, int port) + : Listener(ip, port, false) + {} string MiniWebServer::parseURL( const char * buf ) { - const char * urlStart = strstr( buf , " " ); + const char * urlStart = strchr( buf , ' ' ); if ( ! urlStart ) return "/"; urlStart++; - const char * end = strstr( urlStart , " " ); + const char * end = strchr( urlStart , ' ' ); if ( ! end ) { - end = strstr( urlStart , "\r" ); + end = strchr( urlStart , '\r' ); if ( ! end ) { - end = strstr( urlStart , "\n" ); + end = strchr( urlStart , '\n' ); } } @@ -105,14 +75,14 @@ namespace mongo { if ( eq == string::npos ) continue; - b.append( urlDecode(cur.substr(0,eq)).c_str() , urlDecode(cur.substr(eq+1) ) ); + b.append( urlDecode(cur.substr(0,eq)) , urlDecode(cur.substr(eq+1) ) ); } params = b.obj(); } string MiniWebServer::parseMethod( const char * headers ) { - const char * end = strstr( headers , " " ); + const char * end = strchr( headers , ' ' ); if ( ! end ) return "GET"; return string( headers , (int)(end-headers) ); @@ -139,17 +109,23 @@ namespace mongo { } void MiniWebServer::accepted(int s, const SockAddr &from) { + setSockTimeouts(s, 8); char buf[4096]; int len = 0; while ( 1 ) { - int x = ::recv(s, buf + len, sizeof(buf) - 1 - len, 0); + int left = sizeof(buf) - 1 - len; + if( left == 0 ) + break; + int x = ::recv(s, buf + len, left, 0); if ( x <= 0 ) { + closesocket(s); return; } len += x; buf[ len ] = 0; - if ( fullReceive( buf ) ) + if ( fullReceive( buf ) ) { break; + } } buf[len] = 0; @@ -178,18 +154,23 @@ namespace mongo { ss << "Content-Type: text/html\r\n"; } else { - for ( vector<string>::iterator i = headers.begin(); i != headers.end(); i++ ) + for ( vector<string>::iterator i = headers.begin(); i != headers.end(); i++ ) { + assert( strncmp("Content-Length", i->c_str(), 14) ); ss << *i << "\r\n"; + } } + ss << "Connection: close\r\n"; + ss << "Content-Length: " << responseMsg.size() << "\r\n"; ss << "\r\n"; ss << responseMsg; string response = ss.str(); ::send(s, response.c_str(), response.size(), 0); + closesocket(s); } string MiniWebServer::getHeader( const char * req , string wanted ){ - const char * headers = strstr( req , "\n" ); + const char * headers = strchr( req , '\n' ); if ( ! headers ) return ""; pcrecpp::StringPiece input( headers + 1 ); @@ -203,26 +184,6 @@ namespace mongo { } return ""; } - - void MiniWebServer::run() { - SockAddr from; - while ( ! inShutdown() ) { - int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); - if ( s < 0 ) { - if ( errno == ECONNABORTED ) { - log() << "Listener on port " << port << " aborted." << endl; - return; - } - log() << "MiniWebServer: accept() returns " << s << " " << OUTPUT_ERRNO << endl; - sleepmillis(200); - continue; - } - disableNagle(s); - RARELY log() << "MiniWebServer: connection accepted from " << from.toString() << endl; - accepted( s, from ); - closesocket(s); - } - } string MiniWebServer::urlDecode(const char* s){ stringstream out; diff --git a/util/miniwebserver.h b/util/miniwebserver.h index bdd2873..bbd1ba2 100644 --- a/util/miniwebserver.h +++ b/util/miniwebserver.h @@ -17,20 +17,17 @@ #pragma once -#include "../stdafx.h" +#include "../pch.h" #include "message.h" #include "../db/jsobj.h" namespace mongo { - class MiniWebServer { + class MiniWebServer : public Listener { public: - MiniWebServer(); + MiniWebServer(const string &ip, int _port); virtual ~MiniWebServer() {} - bool init(const string &ip, int _port); - void run(); - virtual void doRequest( const char *rq, // the full request string url, @@ -41,13 +38,13 @@ namespace mongo { const SockAddr &from ) = 0; - int socket() const { return sock; } + // --- static helpers ---- + + static void parseParams( BSONObj & params , string query ); - protected: - string parseURL( const char * buf ); - string parseMethod( const char * headers ); - string getHeader( const char * headers , string name ); - void parseParams( BSONObj & params , string query ); + static string parseURL( const char * buf ); + static string parseMethod( const char * headers ); + static string getHeader( const char * headers , string name ); static const char *body( const char *buf ); static string urlDecode(const char* s); @@ -56,9 +53,6 @@ namespace mongo { private: void accepted(int s, const SockAddr &from); static bool fullReceive( const char *buf ); - - int port; - int sock; }; } // namespace mongo diff --git a/util/mmap.cpp b/util/mmap.cpp index f6bbc73..b9c1994 100644 --- a/util/mmap.cpp +++ b/util/mmap.cpp @@ -15,36 +15,67 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "mmap.h" #include "processinfo.h" +#include "concurrency/rwlock.h" namespace mongo { - set<MemoryMappedFile*> mmfiles; - mongo::mutex mmmutex; + /*static*/ void MemoryMappedFile::updateLength( const char *filename, long &length ) { + if ( !boost::filesystem::exists( filename ) ) + return; + // make sure we map full length if preexisting file. + boost::uintmax_t l = boost::filesystem::file_size( filename ); + assert( l <= 0x7fffffff ); + length = (long) l; + } - MemoryMappedFile::~MemoryMappedFile() { - close(); - scoped_lock lk( mmmutex ); - mmfiles.erase(this); + void* MemoryMappedFile::map(const char *filename) { + boost::uintmax_t l = boost::filesystem::file_size( filename ); + assert( l <= 0x7fffffff ); + long i = (long)l; + return map( filename , i ); } - void MemoryMappedFile::created(){ - scoped_lock lk( mmmutex ); - mmfiles.insert(this); + void printMemInfo( const char * where ){ + cout << "mem info: "; + if ( where ) + cout << where << " "; + ProcessInfo pi; + if ( ! pi.supported() ){ + cout << " not supported" << endl; + return; + } + + cout << "vsize: " << pi.getVirtualMemorySize() << " resident: " << pi.getResidentSize() << " mapped: " << ( MemoryMappedFile::totalMappedLength() / ( 1024 * 1024 ) ) << endl; + } + + /* --- MongoFile ------------------------------------------------- + this is the administrative stuff + */ + + static set<MongoFile*> mmfiles; + static RWLock mmmutex("rw:mmmutex"); + + void MongoFile::destroyed() { + rwlock lk( mmmutex , true ); + mmfiles.erase(this); } /*static*/ - int closingAllFiles = 0; - void MemoryMappedFile::closeAllFiles( stringstream &message ) { + void MongoFile::closeAllFiles( stringstream &message ) { + static int closingAllFiles = 0; if ( closingAllFiles ) { message << "warning closingAllFiles=" << closingAllFiles << endl; return; } ++closingAllFiles; + + rwlock lk( mmmutex , true ); + ProgressMeter pm( mmfiles.size() , 2 , 1 ); - for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){ + for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){ (*i)->close(); pm.hit(); } @@ -52,59 +83,78 @@ namespace mongo { --closingAllFiles; } - long long MemoryMappedFile::totalMappedLength(){ + /*static*/ long long MongoFile::totalMappedLength(){ unsigned long long total = 0; - scoped_lock lk( mmmutex ); - for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) + rwlock lk( mmmutex , false ); + for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ) total += (*i)->length(); return total; } - int MemoryMappedFile::flushAll( bool sync ){ - int num = 0; - - scoped_lock lk( mmmutex ); - for ( set<MemoryMappedFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){ - num++; - MemoryMappedFile * mmf = *i; - if ( ! mmf ) - continue; - mmf->flush( sync ); + /*static*/ int MongoFile::flushAll( bool sync ){ + if ( ! sync ){ + int num = 0; + rwlock lk( mmmutex , false ); + for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){ + num++; + MongoFile * mmf = *i; + if ( ! mmf ) + continue; + + mmf->flush( sync ); + } + return num; + } + + // want to do it sync + set<MongoFile*> seen; + while ( true ){ + auto_ptr<Flushable> f; + { + rwlock lk( mmmutex , false ); + for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){ + MongoFile * mmf = *i; + if ( ! mmf ) + continue; + if ( seen.count( mmf ) ) + continue; + f.reset( mmf->prepareFlush() ); + seen.insert( mmf ); + break; + } + } + if ( ! f.get() ) + break; + + f->flush(); } - return num; + return seen.size(); } - - void MemoryMappedFile::updateLength( const char *filename, long &length ) { - if ( !boost::filesystem::exists( filename ) ) - return; - // make sure we map full length if preexisting file. - boost::uintmax_t l = boost::filesystem::file_size( filename ); - assert( l <= 0x7fffffff ); - length = (long) l; + void MongoFile::created(){ + rwlock lk( mmmutex , true ); + mmfiles.insert(this); } - void* MemoryMappedFile::map(const char *filename) { - boost::uintmax_t l = boost::filesystem::file_size( filename ); - assert( l <= 0x7fffffff ); - long i = (long)l; - return map( filename , i ); - } +#ifdef _DEBUG - void printMemInfo( const char * where ){ - cout << "mem info: "; - if ( where ) - cout << where << " "; - ProcessInfo pi; - if ( ! pi.supported() ){ - cout << " not supported" << endl; - return; + void MongoFile::lockAll() { + rwlock lk( mmmutex , false ); + for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){ + MongoFile * mmf = *i; + if (mmf) mmf->_lock(); } - - cout << "vsize: " << pi.getVirtualMemorySize() << " resident: " << pi.getResidentSize() << " mapped: " << ( MemoryMappedFile::totalMappedLength() / ( 1024 * 1024 ) ) << endl; } + void MongoFile::unlockAll() { + rwlock lk( mmmutex , false ); + for ( set<MongoFile*>::iterator i = mmfiles.begin(); i != mmfiles.end(); i++ ){ + MongoFile * mmf = *i; + if (mmf) mmf->_unlock(); + } + } +#endif } // namespace mongo diff --git a/util/mmap.h b/util/mmap.h index c3133e4..f4875be 100644 --- a/util/mmap.h +++ b/util/mmap.h @@ -18,52 +18,182 @@ #pragma once namespace mongo { + + /* the administrative-ish stuff here */ + class MongoFile : boost::noncopyable { + + public: + /** Flushable has to fail nicely if the underlying object gets killed */ + class Flushable { + public: + virtual ~Flushable(){} + virtual void flush() = 0; + }; + + protected: + virtual void close() = 0; + virtual void flush(bool sync) = 0; + /** + * returns a thread safe object that you can call flush on + * Flushable has to fail nicely if the underlying object gets killed + */ + virtual Flushable * prepareFlush() = 0; + + void created(); /* subclass must call after create */ + void destroyed(); /* subclass must call in destructor */ + + // only supporting on posix mmap + virtual void _lock() {} + virtual void _unlock() {} - class MemoryMappedFile { public: + virtual ~MongoFile() {} + virtual long length() = 0; enum Options { - SEQUENTIAL = 1 + SEQUENTIAL = 1 // hint - e.g. FILE_FLAG_SEQUENTIAL_SCAN on windows + }; + + static int flushAll( bool sync ); // returns n flushed + static long long totalMappedLength(); + static void closeAllFiles( stringstream &message ); + + // Locking allows writes. Reads are always allowed + static void lockAll(); + static void unlockAll(); + + /* can be "overriden" if necessary */ + static bool exists(boost::filesystem::path p) { + return boost::filesystem::exists(p); + } + }; + +#ifndef _DEBUG + // no-ops in production + inline void MongoFile::lockAll() {} + inline void MongoFile::unlockAll() {} + +#endif + + struct MongoFileAllowWrites { + MongoFileAllowWrites(){ + MongoFile::lockAll(); + } + ~MongoFileAllowWrites(){ + MongoFile::unlockAll(); + } + }; + + /** template for what a new storage engine's class definition must implement + PRELIMINARY - subject to change. + */ + class StorageContainerTemplate : public MongoFile { + protected: + virtual void close(); + virtual void flush(bool sync); + public: + virtual long length(); + + /** pointer to a range of space in this storage unit */ + class Pointer { + public: + /** retried address of buffer at offset 'offset' withing the storage unit. returned range is a contiguous + buffer reflecting what is in storage. caller will not read or write past 'len'. + + note calls may be received that are at different points in a range and different lengths. however + for now assume that on writes, if a call is made, previously returned addresses are no longer valid. i.e. + p = at(10000, 500); + q = at(10000, 600); + after the second call it is ok if p is invalid. + */ + void* at(int offset, int len); + + /** indicate that we wrote to the range (from a previous at() call) and that it needs + flushing to disk. + */ + void written(int offset, int len); + + bool isNull() const; + }; + + /** commit written() calls from above. */ + void commit(); + + Pointer open(const char *filename); + Pointer open(const char *_filename, long &length, int options=0); + }; + + class MemoryMappedFile : public MongoFile { + public: + class Pointer { + char *_base; + public: + Pointer() : _base(0) { } + Pointer(void *p) : _base((char*) p) { } + void* at(int offset, int maxLen) { return _base + offset; } + void grow(int offset, int len) { /* no action required with mem mapped file */ } + bool isNull() const { return _base == 0; } }; MemoryMappedFile(); - ~MemoryMappedFile(); /* closes the file if open */ + ~MemoryMappedFile() { + destroyed(); + close(); + } void close(); - // Throws exception if file doesn't exist. + // Throws exception if file doesn't exist. (dm may2010: not sure if this is always true?) void* map( const char *filename ); + /*To replace map(): + + Pointer open( const char *filename ) { + void *p = map(filename); + uassert(13077, "couldn't open/map file", p); + return Pointer(p); + }*/ + /* Creates with length if DNE, otherwise uses existing file length, passed length. */ void* map(const char *filename, long &length, int options = 0 ); void flush(bool sync); + virtual Flushable * prepareFlush(); - void* viewOfs() { + /*void* viewOfs() { return view; - } + }*/ long length() { return len; } - static void updateLength( const char *filename, long &length ); - - static long long totalMappedLength(); - static void closeAllFiles( stringstream &message ); - static int flushAll( bool sync ); - private: - void created(); + static void updateLength( const char *filename, long &length ); HANDLE fd; HANDLE maphandle; void *view; long len; string _filename; + + protected: + // only posix mmap implementations will support this + virtual void _lock(); + virtual void _unlock(); + }; void printMemInfo( const char * where ); +#include "ramstore.h" + +//#define _RAMSTORE +#if defined(_RAMSTORE) + typedef RamStoreFile MMF; +#else + typedef MemoryMappedFile MMF; +#endif + } // namespace mongo diff --git a/util/mmap_mm.cpp b/util/mmap_mm.cpp index 9cffad5..3cbb0d2 100644 --- a/util/mmap_mm.cpp +++ b/util/mmap_mm.cpp @@ -1,4 +1,4 @@ -// mmap_mm.cpp +// mmap_mm.cpp - in memory (no file) version /* Copyright 2009 10gen Inc. * @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "mmap.h" /* in memory (no file) version */ @@ -45,6 +45,8 @@ namespace mongo { void MemoryMappedFile::flush(bool sync) { } + void MemoryMappedFile::_lock() {} + void MemoryMappedFile::_unlock() {} } diff --git a/util/mmap_posix.cpp b/util/mmap_posix.cpp index a5caf8c..af1592c 100644 --- a/util/mmap_posix.cpp +++ b/util/mmap_posix.cpp @@ -15,9 +15,10 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "mmap.h" #include "file_allocator.h" +#include "../db/concurrency.h" #include <errno.h> #include <sys/mman.h> @@ -60,7 +61,7 @@ namespace mongo { fd = open(filename, O_RDWR | O_NOATIME); if ( fd <= 0 ) { - out() << "couldn't open " << filename << ' ' << OUTPUT_ERRNO << endl; + out() << "couldn't open " << filename << ' ' << errnoWithDescription() << endl; return 0; } @@ -74,7 +75,7 @@ namespace mongo { view = mmap(NULL, length, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); if ( view == MAP_FAILED ) { - out() << " mmap() failed for " << filename << " len:" << length << " " << OUTPUT_ERRNO << endl; + out() << " mmap() failed for " << filename << " len:" << length << " " << errnoWithDescription() << endl; if ( errno == ENOMEM ){ out() << " mmap failed with out of memory, if you're using 32-bits, then you probably need to upgrade to 64" << endl; } @@ -86,10 +87,15 @@ namespace mongo { #else if ( options & SEQUENTIAL ){ if ( madvise( view , length , MADV_SEQUENTIAL ) ){ - out() << " madvise failed for " << filename << " " << OUTPUT_ERRNO << endl; + out() << " madvise failed for " << filename << " " << errnoWithDescription() << endl; } } #endif + + DEV if (! dbMutex.info().isLocked()){ + _unlock(); + } + return view; } @@ -97,9 +103,38 @@ namespace mongo { if ( view == 0 || fd == 0 ) return; if ( msync(view, len, sync ? MS_SYNC : MS_ASYNC) ) - problem() << "msync " << OUTPUT_ERRNO << endl; + problem() << "msync " << errnoWithDescription() << endl; } + class PosixFlushable : public MemoryMappedFile::Flushable { + public: + PosixFlushable( void * view , HANDLE fd , long len ) + : _view( view ) , _fd( fd ) , _len(len){ + } + + void flush(){ + if ( _view && _fd ) + if ( msync(_view, _len, MS_SYNC ) ) + problem() << "msync " << errnoWithDescription() << endl; + + } + + void * _view; + HANDLE _fd; + long _len; + }; + + MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush(){ + return new PosixFlushable( view , fd , len ); + } + + void MemoryMappedFile::_lock() { + if (view) assert(mprotect(view, len, PROT_READ | PROT_WRITE) == 0); + } + + void MemoryMappedFile::_unlock() { + if (view) assert(mprotect(view, len, PROT_READ) == 0); + } } // namespace mongo diff --git a/util/mmap_win.cpp b/util/mmap_win.cpp index 6168d9d..97e1589 100644 --- a/util/mmap_win.cpp +++ b/util/mmap_win.cpp @@ -15,8 +15,9 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "mmap.h" +#include "text.h" #include <windows.h> namespace mongo { @@ -40,12 +41,6 @@ namespace mongo { CloseHandle(fd); fd = 0; } - - std::wstring toWideString(const char *s) { - std::basic_ostringstream<TCHAR> buf; - buf << s; - return buf.str(); - } unsigned mapped = 0; @@ -68,14 +63,14 @@ namespace mongo { } updateLength( filename, length ); - std::wstring filenamew = toWideString(filename); DWORD createOptions = FILE_ATTRIBUTE_NORMAL; if ( options & SEQUENTIAL ) createOptions |= FILE_FLAG_SEQUENTIAL_SCAN; fd = CreateFile( - filenamew.c_str(), GENERIC_WRITE | GENERIC_READ, FILE_SHARE_READ, + toNativeString(filename).c_str(), + GENERIC_WRITE | GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_ALWAYS, createOptions , NULL); if ( fd == INVALID_HANDLE_VALUE ) { out() << "Create/OpenFile failed " << filename << ' ' << GetLastError() << endl; @@ -92,7 +87,7 @@ namespace mongo { view = MapViewOfFile(maphandle, FILE_MAP_ALL_ACCESS, 0, 0, 0); if ( view == 0 ) { - out() << "MapViewOfFile failed " << filename << " " << OUTPUT_ERRNO << " "; + out() << "MapViewOfFile failed " << filename << " " << errnoWithDescription() << " "; out() << GetLastError(); out() << endl; } @@ -100,21 +95,47 @@ namespace mongo { return view; } + class WindowsFlushable : public MemoryMappedFile::Flushable { + public: + WindowsFlushable( void * view , HANDLE fd , string filename ) + : _view(view) , _fd(fd) , _filename(filename){ + + } + + void flush(){ + if (!_view || !_fd) + return; + + bool success = FlushViewOfFile(_view, 0); // 0 means whole mapping + if (!success){ + int err = GetLastError(); + out() << "FlushViewOfFile failed " << err << " file: " << _filename << endl; + } + + success = FlushFileBuffers(_fd); + if (!success){ + int err = GetLastError(); + out() << "FlushFileBuffers failed " << err << " file: " << _filename << endl; + } + } + + void * _view; + HANDLE _fd; + string _filename; + + }; + void MemoryMappedFile::flush(bool sync) { uassert(13056, "Async flushing not supported on windows", sync); + + WindowsFlushable f( view , fd , _filename ); + f.flush(); + } - if (!view || !fd) return; - - bool success = FlushViewOfFile(view, 0); // 0 means whole mapping - if (!success){ - int err = GetLastError(); - out() << "FlushViewOfFile failed " << err << " file: " << _filename << endl; - } - - success = FlushFileBuffers(fd); - if (!success){ - int err = GetLastError(); - out() << "FlushFileBuffers failed " << err << " file: " << _filename << endl; - } + MemoryMappedFile::Flushable * MemoryMappedFile::prepareFlush(){ + return new WindowsFlushable( view , fd , _filename ); } + void MemoryMappedFile::_lock() {} + void MemoryMappedFile::_unlock() {} + } diff --git a/util/mongoutils/README b/util/mongoutils/README new file mode 100755 index 0000000..d3d874b --- /dev/null +++ b/util/mongoutils/README @@ -0,0 +1,7 @@ +mongoutils namespace requirements: + +(1) code is not database specific, rather, true utilities +(2) are cross platform +(3) may require boost headers, but not libs +(4) are clean and easy to use in any c++ project without pulling in lots of other stuff +(5) apache license diff --git a/util/mongoutils/checksum.h b/util/mongoutils/checksum.h new file mode 100644 index 0000000..6beb7f4 --- /dev/null +++ b/util/mongoutils/checksum.h @@ -0,0 +1,32 @@ +/** @checksum.h */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace mongoutils { + + /** + * this is a silly temporary implementation + */ + inline int checksum( const char* x , int size ){ + int ck = 0; + for ( int i=0; i<size; i++ ) + ck += ( (int)x[i] * ( i + 1 ) ); + return ck; + } + +} diff --git a/util/mongoutils/html.h b/util/mongoutils/html.h new file mode 100644 index 0000000..e8502ec --- /dev/null +++ b/util/mongoutils/html.h @@ -0,0 +1,158 @@ +// @file html.h + +#pragma once + +/* Things in the mongoutils namespace + (1) are not database specific, rather, true utilities + (2) are cross platform + (3) may require boost headers, but not libs + (4) are clean and easy to use in any c++ project without pulling in lots of other stuff +*/ + +/* Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sstream> + +namespace mongoutils { + + namespace html { + + using namespace std; + + inline string _end() { return "</body></html>"; } + inline string _table() { return "</table>\n\n"; } + inline string _tr() { return "</tr>\n"; } + + inline string tr() { return "<tr>"; } + inline string tr(string a, string b) { + stringstream ss; + ss << "<tr><td>" << a << "</td><td>" << b << "</td></tr>\n"; + return ss.str(); + } + template <class T> + inline string td(T x) { + stringstream ss; + ss << "<td>" << x << "</td>"; + return ss.str(); + } + inline string td(string x) { + return "<td>" + x + "</td>"; + } + inline string th(string x) { + return "<th>" + x + "</th>"; + } + + inline void tablecell( stringstream& ss , bool b ){ + ss << "<td>" << (b ? "<b>X</b>" : "") << "</td>"; + } + + template< typename T> + inline void tablecell( stringstream& ss , const T& t ){ + ss << "<td>" << t << "</td>"; + } + + inline string table(const char *headers[] = 0, bool border = true) { + stringstream ss; + ss << "\n<table " + << (border?"border=1 ":"") + << "cellpadding=2 cellspacing=0>\n"; + if( headers ) { + ss << "<tr>"; + while( *headers ) { + ss << "<th>" << *headers << "</th>"; + headers++; + } + ss << "</tr>\n"; + } + return ss.str(); + } + + inline string start(string title) { + stringstream ss; + ss << "<html><head>\n<title>"; + ss << title; + ss << "</title>\n"; + + ss << "<style type=\"text/css\" media=\"screen\">" + "body { font-family: helvetica, arial, san-serif }\n" + "table { border-collapse:collapse; border-color:#999; margin-top:.5em }\n" + "th { background-color:#bbb; color:#000 }\n" + "td,th { padding:.25em }\n" + "</style>\n"; + + ss << "</head>\n<body>\n"; + return ss.str(); + } + + inline string red(string contentHtml, bool color=true) { + if( !color ) return contentHtml; + stringstream ss; + ss << "<span style=\"color:#A00;\">" << contentHtml << "</span>"; + return ss.str(); + } + inline string grey(string contentHtml, bool color=true) { + if( !color ) return contentHtml; + stringstream ss; + ss << "<span style=\"color:#888;\">" << contentHtml << "</span>"; + return ss.str(); + } + inline string blue(string contentHtml, bool color=true) { + if( !color ) return contentHtml; + stringstream ss; + ss << "<span style=\"color:#00A;\">" << contentHtml << "</span>"; + return ss.str(); + } + inline string yellow(string contentHtml, bool color=true) { + if( !color ) return contentHtml; + stringstream ss; + ss << "<span style=\"color:#A80;\">" << contentHtml << "</span>"; + return ss.str(); + } + inline string green(string contentHtml, bool color=true) { + if( !color ) return contentHtml; + stringstream ss; + ss << "<span style=\"color:#0A0;\">" << contentHtml << "</span>"; + return ss.str(); + } + + inline string p(string contentHtml) { + stringstream ss; + ss << "<p>" << contentHtml << "</p>\n"; + return ss.str(); + } + + inline string h2(string contentHtml) { + stringstream ss; + ss << "<h2>" << contentHtml << "</h2>\n"; + return ss.str(); + } + + /* does NOT escape the strings. */ + inline string a(string href, string title="", string contentHtml = "") { + stringstream ss; + ss << "<a"; + if( !href.empty() ) ss << " href=\"" << href << '"'; + if( !title.empty() ) ss << " title=\"" << title << '"'; + ss << '>'; + if( !contentHtml.empty() ) { + ss << contentHtml << "</a>"; + } + return ss.str(); + } + + } + +} diff --git a/util/mongoutils/mongoutils.vcxproj b/util/mongoutils/mongoutils.vcxproj new file mode 100755 index 0000000..f8919cd --- /dev/null +++ b/util/mongoutils/mongoutils.vcxproj @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{7B84584E-92BC-4DB9-971B-A1A8F93E5053}</ProjectGuid>
+ <RootNamespace>mongoutils</RootNamespace>
+ <ProjectName>mongoutils test program</ProjectName>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>MultiByte</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>MultiByte</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup />
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ </ClCompile>
+ <Link>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ </ClCompile>
+ <Link>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemGroup>
+ <ClCompile Include="test.cpp" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="html.h" />
+ <ClInclude Include="str.h" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ </ImportGroup>
+</Project>
\ No newline at end of file diff --git a/util/mongoutils/mongoutils.vcxproj.filters b/util/mongoutils/mongoutils.vcxproj.filters new file mode 100755 index 0000000..84ecbff --- /dev/null +++ b/util/mongoutils/mongoutils.vcxproj.filters @@ -0,0 +1,10 @@ +<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="test.cpp" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="html.h" />
+ <ClInclude Include="str.h" />
+ </ItemGroup>
+</Project>
\ No newline at end of file diff --git a/util/mongoutils/str.h b/util/mongoutils/str.h new file mode 100644 index 0000000..2028264 --- /dev/null +++ b/util/mongoutils/str.h @@ -0,0 +1,118 @@ +// @file str.h + +/* Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +/* Things in the mongoutils namespace + (1) are not database specific, rather, true utilities + (2) are cross platform + (3) may require boost headers, but not libs + (4) are clean and easy to use in any c++ project without pulling in lots of other stuff + + Note: within this module, we use int for all offsets -- there are no unsigned offsets + and no size_t's. If you need 3 gigabyte long strings, don't use this module. +*/ + +#include <string> +#include <sstream> + +namespace mongoutils { + + namespace str { + + using namespace std; + + /** the idea here is to make one liners easy. e.g.: + + return str::stream() << 1 << ' ' << 2; + + since the following doesn't work: + + (stringstream() << 1).str(); + */ + class stream { + public: + stringstream ss; + + template<class T> + stream& operator<<(const T& v) { + ss << v; + return *this; + } + + operator std::string () const { return ss.str(); } + }; + + inline bool startsWith(const char *str, const char *prefix) { + const char *s = str; + const char *p = prefix; + while( *p ) { + if( *p != *s ) return false; + p++; s++; + } + return true; + } + inline bool startsWith(string s, string p) { return startsWith(s.c_str(), p.c_str()); } + + inline bool endsWith(string s, string p) { + int l = p.size(); + int x = s.size(); + if( x < l ) return false; + return strncmp(s.c_str()+x-l, p.c_str(), l) == 0; + } + + /** find char x, and return rest of string thereafter, or "" if not found */ + inline const char * after(const char *s, char x) { + const char *p = strchr(s, x); + return (p != 0) ? p+1 : ""; } + inline string after(const string& s, char x) { + const char *p = strchr(s.c_str(), x); + return (p != 0) ? string(p+1) : ""; } + + inline const char * after(const char *s, const char *x) { + const char *p = strstr(s, x); + return (p != 0) ? p+strlen(x) : ""; } + inline string after(string s, string x) { + const char *p = strstr(s.c_str(), x.c_str()); + return (p != 0) ? string(p+x.size()) : ""; } + + inline bool contains(string s, string x) { + return strstr(s.c_str(), x.c_str()) != 0; } + + /** @return everything befor the character x, else entire string */ + inline string before(const string& s, char x) { + const char *p = strchr(s.c_str(), x); + return (p != 0) ? s.substr(0, p-s.c_str()) : s; } + + /** check if if strings share a common starting prefix + @return offset of divergence (or length if equal). 0=nothing in common. */ + inline int shareCommonPrefix(const char *p, const char *q) { + int ofs = 0; + while( 1 ) { + if( *p == 0 || *q == 0 ) + break; + if( *p != *q ) + break; + p++; q++; ofs++; + } + return ofs; } + inline int shareCommonPrefix(const string &a, const string &b) + { return shareCommonPrefix(a.c_str(), b.c_str()); } + + } + +} diff --git a/util/mongoutils/test.cpp b/util/mongoutils/test.cpp new file mode 100755 index 0000000..0420624 --- /dev/null +++ b/util/mongoutils/test.cpp @@ -0,0 +1,34 @@ +/* @file test.cpp
+ utils/mongoutils/test.cpp
+ unit tests for mongoutils
+*/
+
+/*
+ * Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "str.h"
+#include "html.h"
+#include <assert.h>
+
+using namespace std;
+using namespace mongoutils;
+
+int main() {
+ string x = str::after("abcde", 'c');
+ assert( x == "de" );
+ assert( str::after("abcde", 'x') == "" );
+ return 0;
+}
diff --git a/util/ntservice.cpp b/util/ntservice.cpp index 251be92..fe4ae44 100644 --- a/util/ntservice.cpp +++ b/util/ntservice.cpp @@ -15,8 +15,10 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "ntservice.h" +#include "winutil.h" +#include "text.h" #include <direct.h> #if defined(_WIN32) @@ -25,14 +27,14 @@ namespace mongo { void shutdown(); - SERVICE_STATUS_HANDLE ServiceController::_statusHandle = null; + SERVICE_STATUS_HANDLE ServiceController::_statusHandle = NULL; std::wstring ServiceController::_serviceName; - ServiceCallback ServiceController::_serviceCallback = null; + ServiceCallback ServiceController::_serviceCallback = NULL; ServiceController::ServiceController() { } - bool ServiceController::installService( const std::wstring& serviceName, const std::wstring& displayName, const std::wstring& serviceDesc, int argc, char* argv[] ) { + bool ServiceController::installService( const std::wstring& serviceName, const std::wstring& displayName, const std::wstring& serviceDesc, const std::wstring& serviceUser, const std::wstring& servicePassword, const std::string dbpath, int argc, char* argv[] ) { assert(argc >= 1); stringstream commandLine; @@ -46,38 +48,81 @@ namespace mongo { } for ( int i = 1; i < argc; i++ ) { - std::string arg( argv[ i ] ); - - // replace install command to indicate process is being started as a service - if ( arg == "--install" ) - arg = "--service"; - - commandLine << arg << " "; - } - - SC_HANDLE schSCManager = ::OpenSCManager( null, null, SC_MANAGER_ALL_ACCESS ); - if ( schSCManager == null ) + std::string arg( argv[ i ] ); + // replace install command to indicate process is being started as a service + if ( arg == "--install" || arg == "--reinstall" ) { + arg = "--service"; + } else if ( arg == "--dbpath" && i + 1 < argc ) { + commandLine << arg << " \"" << dbpath << "\" "; + i++; + continue; + } else if ( arg.length() > 9 && arg.substr(0, 9) == "--service" ) { + // Strip off --service(Name|User|Password) arguments + continue; + } + commandLine << arg << " "; + } + + SC_HANDLE schSCManager = ::OpenSCManager( NULL, NULL, SC_MANAGER_ALL_ACCESS ); + if ( schSCManager == NULL ) { + DWORD err = ::GetLastError(); + cerr << "Error connecting to the Service Control Manager: " << GetWinErrMsg(err) << endl; + return false; + } + + // Make sure servise doesn't already exist. + // TODO: Check to see if service is in "Deleting" status, suggest the user close down Services MMC snap-ins. + SC_HANDLE schService = ::OpenService( schSCManager, serviceName.c_str(), SERVICE_ALL_ACCESS ); + if ( schService != NULL ) { + cerr << "There is already a service named " << toUtf8String(serviceName) << ". Aborting" << endl; + ::CloseServiceHandle( schService ); + ::CloseServiceHandle( schSCManager ); return false; - + } std::basic_ostringstream< TCHAR > commandLineWide; - commandLineWide << commandLine.str().c_str(); + commandLineWide << commandLine.str().c_str(); + + cerr << "Creating service " << toUtf8String(serviceName) << "." << endl; // create new service - SC_HANDLE schService = ::CreateService( schSCManager, serviceName.c_str(), displayName.c_str(), + schService = ::CreateService( schSCManager, serviceName.c_str(), displayName.c_str(), SERVICE_ALL_ACCESS, SERVICE_WIN32_OWN_PROCESS, SERVICE_AUTO_START, SERVICE_ERROR_NORMAL, - commandLineWide.str().c_str(), null, null, L"\0\0", null, null ); + commandLineWide.str().c_str(), NULL, NULL, L"\0\0", NULL, NULL ); + if ( schService == NULL ) { + DWORD err = ::GetLastError(); + cerr << "Error creating service: " << GetWinErrMsg(err) << endl; + ::CloseServiceHandle( schSCManager ); + return false; + } - if ( schService == null ) { - ::CloseServiceHandle( schSCManager ); - return false; - } + cerr << "Service creation successful." << endl; + cerr << "Service can be started from the command line via 'net start \"" << toUtf8String(serviceName) << "\"'." << endl; + + bool serviceInstalled; + + // TODO: If neccessary grant user "Login as a Service" permission. + if ( !serviceUser.empty() ) { + std::wstring actualServiceUser; + if ( serviceUser.find(L"\\") == string::npos ) { + actualServiceUser = L".\\" + serviceUser; + } + else { + actualServiceUser = serviceUser; + } + cerr << "Setting service login credentials. User: " << toUtf8String(actualServiceUser) << endl; + serviceInstalled = ::ChangeServiceConfig( schService, SERVICE_NO_CHANGE, SERVICE_NO_CHANGE, SERVICE_NO_CHANGE, NULL, NULL, NULL, NULL, actualServiceUser.c_str(), servicePassword.c_str(), NULL ); + if ( !serviceInstalled ) { + cerr << "Setting service login failed. Service has 'LocalService' permissions." << endl; + } + } + + // set the service description SERVICE_DESCRIPTION serviceDescription; serviceDescription.lpDescription = (LPTSTR)serviceDesc.c_str(); - - // set new service description - bool serviceInstalled = ::ChangeServiceConfig2( schService, SERVICE_CONFIG_DESCRIPTION, &serviceDescription ); + serviceInstalled = ::ChangeServiceConfig2( schService, SERVICE_CONFIG_DESCRIPTION, &serviceDescription ); + if ( serviceInstalled ) { SC_ACTION aActions[ 3 ] = { { SC_ACTION_RESTART, 0 }, { SC_ACTION_RESTART, 0 }, { SC_ACTION_RESTART, 0 } }; @@ -89,8 +134,12 @@ namespace mongo { // set service recovery options serviceInstalled = ::ChangeServiceConfig2( schService, SERVICE_CONFIG_FAILURE_ACTIONS, &serviceFailure ); + } - + else { + cerr << "Could not set service description. Check the event log for more details." << endl; + } + ::CloseServiceHandle( schService ); ::CloseServiceHandle( schSCManager ); @@ -98,13 +147,16 @@ namespace mongo { } bool ServiceController::removeService( const std::wstring& serviceName ) { - SC_HANDLE schSCManager = ::OpenSCManager( null, null, SC_MANAGER_ALL_ACCESS ); - if ( schSCManager == null ) - return false; + SC_HANDLE schSCManager = ::OpenSCManager( NULL, NULL, SC_MANAGER_ALL_ACCESS ); + if ( schSCManager == NULL ) { + DWORD err = ::GetLastError(); + cerr << "Error connecting to the Service Control Manager: " << GetWinErrMsg(err) << endl; + return false; + } SC_HANDLE schService = ::OpenService( schSCManager, serviceName.c_str(), SERVICE_ALL_ACCESS ); - - if ( schService == null ) { + if ( schService == NULL ) { + cerr << "Could not find a service named " << toUtf8String(serviceName) << " to uninstall." << endl; ::CloseServiceHandle( schSCManager ); return false; } @@ -113,20 +165,30 @@ namespace mongo { // stop service if its running if ( ::ControlService( schService, SERVICE_CONTROL_STOP, &serviceStatus ) ) { + cerr << "Service " << toUtf8String(serviceName) << " is currently running. Stopping service." << endl; while ( ::QueryServiceStatus( schService, &serviceStatus ) ) { if ( serviceStatus.dwCurrentState == SERVICE_STOP_PENDING ) - { - Sleep( 1000 ); - } - else { break; } + { + Sleep( 1000 ); + } + else { break; } } + cerr << "Service stopped." << endl; } + cerr << "Deleting service " << toUtf8String(serviceName) << "." << endl; bool serviceRemoved = ::DeleteService( schService ); ::CloseServiceHandle( schService ); ::CloseServiceHandle( schSCManager ); + if (serviceRemoved) { + cerr << "Service deleted successfully." << endl; + } + else { + cerr << "Failed to delete service." << endl; + } + return serviceRemoved; } @@ -136,14 +198,14 @@ namespace mongo { SERVICE_TABLE_ENTRY dispTable[] = { { (LPTSTR)serviceName.c_str(), (LPSERVICE_MAIN_FUNCTION)ServiceController::initService }, - { null, null } + { NULL, NULL } }; return StartServiceCtrlDispatcher( dispTable ); } bool ServiceController::reportStatus( DWORD reportState, DWORD waitHint ) { - if ( _statusHandle == null ) + if ( _statusHandle == NULL ) return false; static DWORD checkPoint = 1; diff --git a/util/ntservice.h b/util/ntservice.h index 00b8a0a..271e7d7 100644 --- a/util/ntservice.h +++ b/util/ntservice.h @@ -29,7 +29,7 @@ namespace mongo { ServiceController(); virtual ~ServiceController() {} - static bool installService( const std::wstring& serviceName, const std::wstring& displayName, const std::wstring& serviceDesc, int argc, char* argv[] ); + static bool installService( const std::wstring& serviceName, const std::wstring& displayName, const std::wstring& serviceDesc, const std::wstring& serviceUser, const std::wstring& servicePassword, const std::string dbpath, int argc, char* argv[] ); static bool removeService( const std::wstring& serviceName ); static bool startService( const std::wstring& serviceName, ServiceCallback startService ); static bool reportStatus( DWORD reportState, DWORD waitHint = 0 ); diff --git a/util/optime.h b/util/optime.h index 8b26434..4645968 100644 --- a/util/optime.h +++ b/util/optime.h @@ -20,15 +20,20 @@ #include "../db/concurrency.h" namespace mongo { - void exitCleanly( int code ); + void exitCleanly( ExitCode code ); - /* Operation sequence #. A combination of current second plus an ordinal value. - */ struct ClockSkewException : public DBException { - virtual const char* what() const throw() { return "clock skew exception"; } - virtual int getCode(){ return 20001; } + ClockSkewException() : DBException( "clock skew exception" , 20001 ){} }; - + + /* replsets use RSOpTime. + M/S uses OpTime. + But this is useable from both. + */ + typedef unsigned long long ReplTime; + + /* Operation sequence #. A combination of current second plus an ordinal value. + */ #pragma pack(4) class OpTime { unsigned i; @@ -44,8 +49,8 @@ namespace mongo { OpTime(Date_t date) { reinterpret_cast<unsigned long long&>(*this) = date.millis; } - OpTime(unsigned long long date) { - reinterpret_cast<unsigned long long&>(*this) = date; + OpTime(ReplTime x) { + reinterpret_cast<unsigned long long&>(*this) = x; } OpTime(unsigned a, unsigned b) { secs = a; @@ -87,14 +92,14 @@ namespace mongo { bytes of overhead. */ unsigned long long asDate() const { - return *((unsigned long long *) &i); + return reinterpret_cast<const unsigned long long*>(&i)[0]; } - // unsigned long long& asDate() { return *((unsigned long long *) &i); } - - bool isNull() { - return secs == 0; + long long asLL() const { + return reinterpret_cast<const long long*>(&i)[0]; } + bool isNull() const { return secs == 0; } + string toStringLong() const { char buf[64]; time_t_to_String(secs, buf); @@ -104,12 +109,18 @@ namespace mongo { return ss.str(); } + string toStringPretty() const { + stringstream ss; + ss << time_t_to_String_short(secs) << ':' << hex << i; + return ss.str(); + } + string toString() const { stringstream ss; ss << hex << secs << ':' << i; return ss.str(); } - operator string() const { return toString(); } + bool operator==(const OpTime& r) const { return i == r.i && secs == r.secs; } @@ -121,6 +132,15 @@ namespace mongo { return secs < r.secs; return i < r.i; } + bool operator<=(const OpTime& r) const { + return *this < r || *this == r; + } + bool operator>(const OpTime& r) const { + return !(*this <= r); + } + bool operator>=(const OpTime& r) const { + return !(*this < r); + } }; #pragma pack() diff --git a/util/password.cpp b/util/password.cpp new file mode 100644 index 0000000..3037da1 --- /dev/null +++ b/util/password.cpp @@ -0,0 +1,92 @@ +/* + * Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "password.h" +#include <iostream> + +#ifndef _WIN32 +#include <termios.h> +#endif + +using namespace std; + +namespace mongo { + + string askPassword() { + + std::string password; + cout << "Enter password: "; +#ifndef _WIN32 + const int stdinfd = 0; + termios termio; + tcflag_t old = 0; + if ( isatty( stdinfd ) ) { + int i = tcgetattr( stdinfd, &termio ); + if( i == -1 ) { + cerr << "Cannot get terminal attributes " << errnoWithDescription() << endl; + return string(); + } + old = termio.c_lflag; + termio.c_lflag &= ~ECHO; + i = tcsetattr( stdinfd, TCSANOW, &termio ); + if( i == -1 ) { + cerr << "Cannot set terminal attributes " << errnoWithDescription() << endl; + return string(); + } + } + + cin >> password; + + if ( isatty( stdinfd ) ) { + termio.c_lflag = old; + int i = tcsetattr( stdinfd, TCSANOW, &termio ); + if( i == -1 ) { + cerr << "Cannot set terminal attributes " << errnoWithDescription() << endl; + return string(); + } + } +#else + HANDLE stdinh = GetStdHandle( STD_INPUT_HANDLE ); + if ( stdinh == INVALID_HANDLE_VALUE) { + cerr << "Cannot get stdin handle " << GetLastError() << "\n"; + return string(); + } + + DWORD old; + if ( !GetConsoleMode( stdinh, &old ) ) { + cerr << "Cannot get console mode " << GetLastError() << "\n"; + return string(); + } + + DWORD noecho = ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT; + if ( !SetConsoleMode( stdinh, noecho ) ) { + cerr << "Cannot set console mode " << GetLastError() << "\n"; + return string(); + } + + cin >> password; + + if ( !SetConsoleMode( stdinh, old ) ) { + cerr << "Cannot set console mode " << GetLastError() << "\n"; + return string(); + } + cin.get(); +#endif + cout << "\n"; + return password; + } +} diff --git a/util/password.h b/util/password.h new file mode 100644 index 0000000..18294b2 --- /dev/null +++ b/util/password.h @@ -0,0 +1,61 @@ +/* + * Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#pragma once + +#include <boost/program_options.hpp> +#include <string> + +namespace mongo { + + struct PasswordValue : public boost::program_options::typed_value<std::string> { + + PasswordValue( std::string* val ) + : boost::program_options::typed_value<std::string>( val ) { } + + unsigned min_tokens() const { + return 0; + } + + unsigned max_tokens() const { + return 1; + } + + bool is_required() const { + return false; + } + + void xparse( boost::any& value_store, + const std::vector<std::string>& new_tokens ) const { + if ( !value_store.empty() ) +#if BOOST_VERSION >= 104200 + boost::throw_exception( boost::program_options::validation_error( boost::program_options::validation_error::multiple_values_not_allowed ) ); +#else + boost::throw_exception( boost::program_options::validation_error( "multiple values not allowed" ) ); +#endif + else if ( !new_tokens.empty() ) + boost::program_options::typed_value<std::string>::xparse + (value_store, new_tokens); + else + value_store = std::string(); + } + + }; + + std::string askPassword(); + +} diff --git a/util/processinfo.cpp b/util/processinfo.cpp new file mode 100644 index 0000000..3257b5e --- /dev/null +++ b/util/processinfo.cpp @@ -0,0 +1,47 @@ +// processinfo.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "processinfo.h" + +#include <iostream> +using namespace std; + +namespace mongo { + + class PidFileWiper { + public: + ~PidFileWiper(){ + ofstream out( path.c_str() , ios_base::out ); + out.close(); + } + + void write( const string& p ){ + path = p; + ofstream out( path.c_str() , ios_base::out ); + out << getpid() << endl; + out.close(); + } + + string path; + } pidFileWiper; + + void writePidFile( const string& path ){ + pidFileWiper.write( path ); + } + +} diff --git a/util/processinfo.h b/util/processinfo.h index b7bc90d..8e20beb 100644 --- a/util/processinfo.h +++ b/util/processinfo.h @@ -18,6 +18,7 @@ #pragma once #include <sys/types.h> +#include <string> #ifndef _WIN32 #include <unistd.h> @@ -58,5 +59,7 @@ namespace mongo { private: pid_t _pid; }; - + + void writePidFile( const std::string& path ); + } diff --git a/util/processinfo_darwin.cpp b/util/processinfo_darwin.cpp index 206c270..cb54bed 100644 --- a/util/processinfo_darwin.cpp +++ b/util/processinfo_darwin.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "../stdafx.h" +#include "../pch.h" #include "processinfo.h" #include "log.h" @@ -108,7 +108,7 @@ namespace mongo { start = start - ( (unsigned long long)start % pageSize ); char x = 0; if ( mincore( start , 128 , &x ) ){ - log() << "mincore failed: " << OUTPUT_ERRNO << endl; + log() << "mincore failed: " << errnoWithDescription() << endl; return 1; } return x & 0x1; diff --git a/util/processinfo_linux2.cpp b/util/processinfo_linux2.cpp index 917e707..02a7ad0 100644 --- a/util/processinfo_linux2.cpp +++ b/util/processinfo_linux2.cpp @@ -40,11 +40,10 @@ namespace mongo { FILE * f = fopen( name , "r"); if ( ! f ){ stringstream ss; - ss << "couldn't open [" << name << "] " << OUTPUT_ERRNO; + ss << "couldn't open [" << name << "] " << errnoWithDescription(); string s = ss.str(); - msgasserted( 13276 , s.c_str() ); + msgassertedNoTrace( 13276 , s.c_str() ); } - int found = fscanf(f, "%d %s %c " "%d %d %d %d %d " @@ -232,7 +231,7 @@ namespace mongo { start = start - ( (unsigned long long)start % pageSize ); unsigned char x = 0; if ( mincore( start , 128 , &x ) ){ - log() << "mincore failed: " << OUTPUT_ERRNO << endl; + log() << "mincore failed: " << errnoWithDescription() << endl; return 1; } return x & 0x1; diff --git a/util/processinfo_none.cpp b/util/processinfo_none.cpp index 9af1766..b54cb13 100644 --- a/util/processinfo_none.cpp +++ b/util/processinfo_none.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "processinfo.h" #include <iostream> diff --git a/util/processinfo_win32.cpp b/util/processinfo_win32.cpp index 0705fcb..5fc6ab5 100644 --- a/util/processinfo_win32.cpp +++ b/util/processinfo_win32.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "processinfo.h" #include <iostream> diff --git a/util/queue.h b/util/queue.h index d48e012..35e02a8 100644 --- a/util/queue.h +++ b/util/queue.h @@ -1,4 +1,4 @@ -// queue.h +// @file queue.h /* Copyright 2009 10gen Inc. * @@ -17,7 +17,7 @@ #pragma once -#include "../stdafx.h" +#include "../pch.h" #include "../util/goodies.h" #include <queue> @@ -29,6 +29,8 @@ namespace mongo { */ template<typename T> class BlockingQueue : boost::noncopyable { public: + BlockingQueue() : _lock("BlockingQueue") { } + void push(T const& t){ scoped_lock l( _lock ); _queue.push( t ); diff --git a/util/ramlog.h b/util/ramlog.h new file mode 100644 index 0000000..393527d --- /dev/null +++ b/util/ramlog.h @@ -0,0 +1,142 @@ +// log.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "log.h" +#include "mongoutils/html.h" + +namespace mongo { + + class RamLog : public Tee { + enum { + N = 128, + C = 256 + }; + char lines[N][C]; + unsigned h, n; + + public: + RamLog() { + h = 0; n = 0; + for( int i = 0; i < N; i++ ) + lines[i][C-1] = 0; + } + + virtual void write(LogLevel ll, const string& str) { + char *p = lines[(h+n)%N]; + if( str.size() < C ) + strcpy(p, str.c_str()); + else + memcpy(p, str.c_str(), C-1); + if( n < N ) n++; + else h = (h+1) % N; + } + + void get( vector<const char*>& v) const { + for( unsigned x=0, i=h; x++ < n; i=(i+1)%N ) + v.push_back(lines[i]); + } + + static int repeats(const vector<const char *>& v, int i) { + for( int j = i-1; j >= 0 && j+8 > i; j-- ) { + if( strcmp(v[i]+20,v[j]+20) == 0 ) { + for( int x = 1; ; x++ ) { + if( j+x == i ) return j; + if( i+x>=(int) v.size() ) return -1; + if( strcmp(v[i+x]+20,v[j+x]+20) ) return -1; + } + return -1; + } + } + return -1; + } + + + static string clean(const vector<const char *>& v, int i, string line="") { + if( line.empty() ) line = v[i]; + if( i > 0 && strncmp(v[i], v[i-1], 11) == 0 ) + return string(" ") + line.substr(11); + return v[i]; + } + + static string color(string line) { + string s = str::after(line, "replSet "); + if( str::startsWith(s, "warning") || startsWith(s, "error") ) + return html::red(line); + if( str::startsWith(s, "info") ) { + if( str::endsWith(s, " up\n") ) + return html::green(line); + else if( str::contains(s, " down ") || str::endsWith(s, " down\n") ) + return html::yellow(line); + return line; //html::blue(line); + } + + return line; + } + + /* turn http:... into an anchor */ + string linkify(const char *s) { + const char *p = s; + const char *h = strstr(p, "http://"); + if( h == 0 ) return s; + + const char *sp = h + 7; + while( *sp && *sp != ' ' ) sp++; + + string url(h, sp-h); + stringstream ss; + ss << string(s, h-s) << "<a href=\"" << url << "\">" << url << "</a>" << sp; + return ss.str(); + } + + void toHTML(stringstream& s) { + vector<const char*> v; + get( v ); + + bool first = true; + s << "<pre>\n"; + for( int i = 0; i < (int)v.size(); i++ ) { + assert( strlen(v[i]) > 20 ); + int r = repeats(v, i); + if( r < 0 ) { + s << color( linkify( clean(v,i).c_str() ) ); + } + else { + stringstream x; + x << string(v[i], 0, 20); + int nr = (i-r); + int last = i+nr-1; + for( ; r < i ; r++ ) x << '.'; + if( 1 ) { + stringstream r; + if( nr == 1 ) r << "repeat last line"; + else r << "repeats last " << nr << " lines; ends " << string(v[last]+4,0,15); + first = false; s << html::a("", r.str(), clean(v,i,x.str())); + } + else s << x.str(); + s << '\n'; + i = last; + } + } + s << "</pre>\n"; + } + + + }; + +} diff --git a/util/ramstore.cpp b/util/ramstore.cpp new file mode 100644 index 0000000..0bdf2e2 --- /dev/null +++ b/util/ramstore.cpp @@ -0,0 +1,93 @@ +/** +* Copyright (C) 2008 10gen Inc.info +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "mmap.h" + +namespace mongo { + + //extern bool checkNsFilesOnLoad; + +static set<RamStoreFile*> files; + +void RamStoreFile::grow(int offset, int len) { + cout << "GROW ofs:" << offset << " len:" << len; + assert( len > 0 ); + Node& n = _m[offset]; + cout << " oldlen:" << n.len << endl; + assert( n.len > 0 ); + if( len > n.len ) { + n.p = (char *) realloc(n.p, len); + memset(((char *)n.p) + n.len, 0, len - n.len); + n.len = len; + } +} + +/* maxLen can be -1 for existing data */ +void* RamStoreFile::at(int offset, int maxLen) { + if( offset != _last ) { + if( _m.count(_last) ) { + _m[_last].check(); + if( !(offset < _last || offset >= _last + _m[_last].len) ) { + cout << offset << ' ' << _last << ' ' << _m[_last].len << endl; + assert(false); + } + } + } + _last = offset; + + Node& n = _m[offset]; + if( n.len == 0 ) { + // create + if( strstr(name, ".ns") == 0 ) + cout << "CREATE " << name << " ofs:" << offset << " len:" << maxLen << endl; + assert( maxLen >= 0 ); + n.p = (char *) calloc(maxLen+1, 1); + n.len = maxLen; + } + assert( n.len >= maxLen ); + n.check(); + return n.p; + } + +void RamStoreFile::Node::check() { + assert( p[len] == 0 ); +} + +void RamStoreFile::check() { + for( std::map<int,Node>::iterator i = _m.begin(); i != _m.end(); i++ ) { + i->second.check(); + } +} + +void RamStoreFile::validate() { + for( set<RamStoreFile*>::iterator i = files.begin(); i != files.end(); i++ ) { + (*i)->check(); + } +} + +RamStoreFile::~RamStoreFile() { + check(); + files.erase(this); +} + +RamStoreFile::RamStoreFile() : _len(0) { + // checkNsFilesOnLoad = false; + files.insert(this); +} + +} + diff --git a/util/ramstore.h b/util/ramstore.h new file mode 100644 index 0000000..f75a57a --- /dev/null +++ b/util/ramstore.h @@ -0,0 +1,86 @@ +// ramstore.h + +// mmap.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +extern bool checkNsFilesOnLoad; + +class RamStoreFile : public MongoFile { + char name[256]; + struct Node { + char *p; + int len; + Node() : len(0) { } + void check(); + }; + std::map<int,Node> _m; + long _len; + + static void validate(); + void check(); + + int _last; + + void grow(int offset, int len); + + /* maxLen can be -1 for existing data */ + void* at(int offset, int maxLen); + +protected: + virtual void close() { + cout << "ramstore dealloc not yet implemented" << endl; + if( _len ) { + _len = 0; + } + } + virtual void flush(bool sync) { } + +public: + ~RamStoreFile(); + RamStoreFile(); + + virtual long length() { return _len; } + + class Pointer { + RamStoreFile* _f; + friend class RamStoreFile; + public: + void* at(int offset, int len) { + assert( len <= /*MaxBSONObjectSize*/4*1024*1024 + 128 ); + return _f->at(offset,len); + } + void grow(int offset, int len) { + assert( len <= /*MaxBSONObjectSize*/4*1024*1024 + 128 ); + _f->grow(offset,len); + } + bool isNull() const { return _f == 0; } + }; + + Pointer map( const char *filename ) { + assert(false); return Pointer(); + } + Pointer map(const char *_filename, long &length, int options=0) { + strncpy(name, _filename, sizeof(name)-1); + Pointer p; + p._f = this; + return p; + } + + static bool exists(boost::filesystem::path p) { + return false; + } +}; diff --git a/util/sock.cpp b/util/sock.cpp index 5beac68..c4e1a71 100644 --- a/util/sock.cpp +++ b/util/sock.cpp @@ -15,31 +15,134 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "sock.h" namespace mongo { - static mongo::mutex sock_mutex; + static mongo::mutex sock_mutex("sock_mutex"); - string hostbyname(const char *hostname) { - static string unknown = "0.0.0.0"; - if ( unknown == hostname ) - return unknown; + static bool ipv6 = false; + void enableIPv6(bool state) { ipv6 = state; } + bool IPv6Enabled() { return ipv6; } - scoped_lock lk(sock_mutex); -#if defined(_WIN32) - if( inet_addr(hostname) != INADDR_NONE ) - return hostname; -#else - struct in_addr temp; - if ( inet_aton( hostname, &temp ) ) - return hostname; + SockAddr::SockAddr(int sourcePort) { + memset(as<sockaddr_in>().sin_zero, 0, sizeof(as<sockaddr_in>().sin_zero)); + as<sockaddr_in>().sin_family = AF_INET; + as<sockaddr_in>().sin_port = htons(sourcePort); + as<sockaddr_in>().sin_addr.s_addr = htonl(INADDR_ANY); + addressSize = sizeof(sockaddr_in); + } + + SockAddr::SockAddr(const char * iporhost , int port) { + if (!strcmp(iporhost, "localhost")) + iporhost = "127.0.0.1"; + + if (strchr(iporhost, '/')){ +#ifdef _WIN32 + uassert(13080, "no unix socket support on windows", false); #endif - struct hostent *h; - h = gethostbyname(hostname); - if ( h == 0 ) return ""; - return inet_ntoa( *((struct in_addr *)(h->h_addr)) ); + uassert(13079, "path to unix socket too long", strlen(iporhost) < sizeof(as<sockaddr_un>().sun_path)); + as<sockaddr_un>().sun_family = AF_UNIX; + strcpy(as<sockaddr_un>().sun_path, iporhost); + addressSize = sizeof(sockaddr_un); + }else{ + addrinfo* addrs = NULL; + addrinfo hints; + memset(&hints, 0, sizeof(addrinfo)); + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_ADDRCONFIG; + hints.ai_family = (IPv6Enabled() ? AF_UNSPEC : AF_INET); + + stringstream ss; + ss << port; + int ret = getaddrinfo(iporhost, ss.str().c_str(), &hints, &addrs); + if (ret){ + log() << "getaddrinfo(\"" << iporhost << "\") failed: " << gai_strerror(ret) << endl; + *this = SockAddr(port); + }else{ + //TODO: handle other addresses in linked list; + assert(addrs->ai_addrlen <= sizeof(sa)); + memcpy(&sa, addrs->ai_addr, addrs->ai_addrlen); + addressSize = addrs->ai_addrlen; + freeaddrinfo(addrs); + } + } + } + + bool SockAddr::isLocalHost() const { + switch (getType()){ + case AF_INET: return getAddr() == "127.0.0.1"; + case AF_INET6: return getAddr() == "::1"; + case AF_UNIX: return true; + default: return false; + } + assert(false); + return false; + } + + string hostbyname(const char *hostname) { + string addr = SockAddr(hostname, 0).getAddr(); + if (addr == "0.0.0.0") + return ""; + else + return addr; + } + + class UDPConnection { + public: + UDPConnection() { + sock = 0; + } + ~UDPConnection() { + if ( sock ) { + closesocket(sock); + sock = 0; + } + } + bool init(const SockAddr& myAddr); + int recvfrom(char *buf, int len, SockAddr& sender); + int sendto(char *buf, int len, const SockAddr& EndPoint); + int mtu(const SockAddr& sa) { + return sa.isLocalHost() ? 16384 : 1480; + } + + SOCKET sock; + }; + + inline int UDPConnection::recvfrom(char *buf, int len, SockAddr& sender) { + return ::recvfrom(sock, buf, len, 0, sender.raw(), &sender.addressSize); + } + + inline int UDPConnection::sendto(char *buf, int len, const SockAddr& EndPoint) { + if ( 0 && rand() < (RAND_MAX>>4) ) { + out() << " NOTSENT "; + return 0; + } + return ::sendto(sock, buf, len, 0, EndPoint.raw(), EndPoint.addressSize); + } + + inline bool UDPConnection::init(const SockAddr& myAddr) { + sock = socket(myAddr.getType(), SOCK_DGRAM, IPPROTO_UDP); + if ( sock == INVALID_SOCKET ) { + out() << "invalid socket? " << errnoWithDescription() << endl; + return false; + } + if ( ::bind(sock, myAddr.raw(), myAddr.addressSize) != 0 ) { + out() << "udp init failed" << endl; + closesocket(sock); + sock = 0; + return false; + } + socklen_t optLen; + int rcvbuf; + if (getsockopt(sock, + SOL_SOCKET, + SO_RCVBUF, + (char*)&rcvbuf, + &optLen) != -1) + out() << "SO_RCVBUF:" << rcvbuf << endl; + return true; } void sendtest() { @@ -50,7 +153,7 @@ namespace mongo { if ( c.init(me) ) { char buf[256]; out() << "sendto: "; - out() << c.sendto(buf, sizeof(buf), dest) << " " << OUTPUT_ERRNO << endl; + out() << c.sendto(buf, sizeof(buf), dest) << " " << errnoWithDescription() << endl; } out() << "end\n"; } @@ -63,147 +166,44 @@ namespace mongo { if ( c.init(me) ) { char buf[256]; out() << "recvfrom: "; - out() << c.recvfrom(buf, sizeof(buf), sender) << " " << OUTPUT_ERRNO << endl; + out() << c.recvfrom(buf, sizeof(buf), sender) << " " << errnoWithDescription() << endl; } out() << "end listentest\n"; } void xmain(); - struct SockStartupTests { - SockStartupTests() { + #if defined(_WIN32) - WSADATA d; - if ( WSAStartup(MAKEWORD(2,2), &d) != 0 ) { - out() << "ERROR: wsastartup failed " << OUTPUT_ERRNO << endl; - problem() << "ERROR: wsastartup failed " << OUTPUT_ERRNO << endl; - dbexit( EXIT_NTSERVICE_ERROR ); + namespace { + struct WinsockInit { + WinsockInit() { + WSADATA d; + if ( WSAStartup(MAKEWORD(2,2), &d) != 0 ) { + out() << "ERROR: wsastartup failed " << errnoWithDescription() << endl; + problem() << "ERROR: wsastartup failed " << errnoWithDescription() << endl; + dbexit( EXIT_NTSERVICE_ERROR ); + } } -#endif - //out() << "ntohl:" << ntohl(256) << endl; - //sendtest(); - //listentest(); - } - } sstests; - -#if 0 - void smain() { - - WSADATA wsaData; - SOCKET RecvSocket; - sockaddr_in RecvAddr; - int Port = 27015; - char RecvBuf[1024]; - int BufLen = 1024; - sockaddr_in SenderAddr; - int SenderAddrSize = sizeof(SenderAddr); - - //----------------------------------------------- - // Initialize Winsock - WSAStartup(MAKEWORD(2,2), &wsaData); - - //----------------------------------------------- - // Create a receiver socket to receive datagrams - RecvSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - prebindOptions( RecvSocket ); - - //----------------------------------------------- - // Bind the socket to any address and the specified port. - RecvAddr.sin_family = AF_INET; - RecvAddr.sin_port = htons(Port); - RecvAddr.sin_addr.s_addr = htonl(INADDR_ANY); - - ::bind(RecvSocket, (SOCKADDR *) &RecvAddr, sizeof(RecvAddr)); - - //----------------------------------------------- - // Call the recvfrom function to receive datagrams - // on the bound socket. - printf("Receiving datagrams...\n"); - recvfrom(RecvSocket, - RecvBuf, - BufLen, - 0, - (SOCKADDR *)&SenderAddr, - &SenderAddrSize); - - //----------------------------------------------- - // Close the socket when finished receiving datagrams - printf("Finished receiving. Closing socket.\n"); - closesocket(RecvSocket); - - //----------------------------------------------- - // Clean up and exit. - printf("Exiting.\n"); - WSACleanup(); - return; + } winsock_init; } - - - - - void xmain() { - - WSADATA wsaData; - SOCKET RecvSocket; - sockaddr_in RecvAddr; - int Port = 27015; - char RecvBuf[1024]; - int BufLen = 1024; - sockaddr_in SenderAddr; - int SenderAddrSize = sizeof(SenderAddr); - - //----------------------------------------------- - // Initialize Winsock - WSAStartup(MAKEWORD(2,2), &wsaData); - - //----------------------------------------------- - // Create a receiver socket to receive datagrams - - RecvSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - prebindOptions( RecvSocket ); - - //----------------------------------------------- - // Bind the socket to any address and the specified port. - RecvAddr.sin_family = AF_INET; - RecvAddr.sin_port = htons(Port); - RecvAddr.sin_addr.s_addr = htonl(INADDR_ANY); - - SockAddr a(Port); - ::bind(RecvSocket, (SOCKADDR *) &a.sa, a.addressSize); -// bind(RecvSocket, (SOCKADDR *) &RecvAddr, sizeof(RecvAddr)); - - SockAddr b; - - //----------------------------------------------- - // Call the recvfrom function to receive datagrams - // on the bound socket. - printf("Receiving datagrams...\n"); - recvfrom(RecvSocket, - RecvBuf, - BufLen, - 0, - (SOCKADDR *) &b.sa, &b.addressSize); -// (SOCKADDR *)&SenderAddr, -// &SenderAddrSize); - - //----------------------------------------------- - // Close the socket when finished receiving datagrams - printf("Finished receiving. Closing socket.\n"); - closesocket(RecvSocket); - - //----------------------------------------------- - // Clean up and exit. - printf("Exiting.\n"); - WSACleanup(); - return; - } - #endif + SockAddr unknownAddress( "0.0.0.0", 0 ); + ListeningSockets* ListeningSockets::_instance = new ListeningSockets(); ListeningSockets* ListeningSockets::get(){ return _instance; } + + string getHostNameCached(){ + static string host; + if ( host.empty() ){ + string s = getHostName(); + host = s; + } + return host; + } } // namespace mongo diff --git a/util/sock.h b/util/sock.h index ee7a7ae..300c24d 100644 --- a/util/sock.h +++ b/util/sock.h @@ -17,32 +17,43 @@ #pragma once -#include "../stdafx.h" +#include "../pch.h" #include <stdio.h> #include <sstream> #include "goodies.h" - -#ifdef _WIN32 -#include <windows.h> -#include <winsock.h> -#endif +#include "../db/jsobj.h" namespace mongo { + const int SOCK_FAMILY_UNKNOWN_ERROR=13078; + #if defined(_WIN32) + typedef short sa_family_t; typedef int socklen_t; inline int getLastError() { return WSAGetLastError(); } + inline const char* gai_strerror(int code) { + return ::gai_strerrorA(code); + } inline void disableNagle(int sock) { int x = 1; if ( setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &x, sizeof(x)) ) out() << "ERROR: disableNagle failed" << endl; + if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) ) + out() << "ERROR: SO_KEEPALIVE failed" << endl; } inline void prebindOptions( int sock ) { } + + // This won't actually be used on windows + struct sockaddr_un { + short sun_family; + char sun_path[108]; // length from unix header + }; + #else } // namespace mongo @@ -50,11 +61,19 @@ namespace mongo { #include <sys/socket.h> #include <sys/types.h> #include <sys/socket.h> +#include <sys/un.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <errno.h> #include <netdb.h> +#ifdef __openbsd__ +# include <sys/uio.h> +#endif + +#ifndef AI_ADDRCONFIG +# define AI_ADDRCONFIG 0 +#endif namespace mongo { @@ -74,7 +93,12 @@ namespace mongo { #endif if ( setsockopt(sock, level, TCP_NODELAY, (char *) &x, sizeof(x)) ) - log() << "ERROR: disableNagle failed" << endl; + log() << "ERROR: disableNagle failed: " << errnoWithDescription() << endl; + +#ifdef SO_KEEPALIVE + if ( setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &x, sizeof(x)) ) + log() << "ERROR: SO_KEEPALIVE failed: " << errnoWithDescription() << endl; +#endif } inline void prebindOptions( int sock ) { @@ -87,161 +111,149 @@ namespace mongo { #endif - inline void setSockReceiveTimeout(int sock, int secs) { -// todo - finish - works? + inline string makeUnixSockPath(int port){ + return "/tmp/mongodb-" + BSONObjBuilder::numStr(port) + ".sock"; + } + + inline void setSockTimeouts(int sock, int secs) { struct timeval tv; - tv.tv_sec = 0;//secs; - tv.tv_usec = 1000; - int rc = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv)); - if ( rc ) { - out() << "ERROR: setsockopt RCVTIMEO failed rc:" << rc << " " << OUTPUT_ERRNO << " secs:" << secs << " sock:" << sock << endl; - } + tv.tv_sec = secs; + tv.tv_usec = 0; + bool report = logLevel > 3; // solaris doesn't provide these + DEV report = true; + bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv) ) == 0; + if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; + ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof(tv) ) == 0; + DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; } // If an ip address is passed in, just return that. If a hostname is passed // in, look up its ip and return that. Returns "" on failure. string hostbyname(const char *hostname); + void enableIPv6(bool state=true); + bool IPv6Enabled(); + struct SockAddr { SockAddr() { - addressSize = sizeof(sockaddr_in); + addressSize = sizeof(sa); memset(&sa, 0, sizeof(sa)); + sa.ss_family = AF_UNSPEC; } SockAddr(int sourcePort); /* listener side */ SockAddr(const char *ip, int port); /* EndPoint (remote) side, or if you want to specify which interface locally */ - struct sockaddr_in sa; - socklen_t addressSize; + template <typename T> + T& as() { return *(T*)(&sa); } + template <typename T> + const T& as() const { return *(const T*)(&sa); } - bool isLocalHost() const { -#if defined(_WIN32) - return sa.sin_addr.S_un.S_addr == 0x100007f; -#else - return sa.sin_addr.s_addr == 0x100007f; -#endif + string toString(bool includePort=true) const{ + string out = getAddr(); + if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC) + out += ':' + BSONObjBuilder::numStr(getPort()); + return out; } - string toString() const{ - stringstream out; - out << inet_ntoa(sa.sin_addr) << ':' - << ntohs(sa.sin_port); - return out.str(); + // returns one of AF_INET, AF_INET6, or AF_UNIX + sa_family_t getType() const { + return sa.ss_family; } - operator string() const{ - return toString(); + unsigned getPort() const { + switch (getType()){ + case AF_INET: return ntohs(as<sockaddr_in>().sin_port); + case AF_INET6: return ntohs(as<sockaddr_in6>().sin6_port); + case AF_UNIX: return 0; + case AF_UNSPEC: return 0; + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return 0; + } } - unsigned getPort() { - return sa.sin_port; + string getAddr() const { + switch (getType()){ + case AF_INET: + case AF_INET6: { + const int buflen=128; + char buffer[buflen]; + int ret = getnameinfo(raw(), addressSize, buffer, buflen, NULL, 0, NI_NUMERICHOST); + massert(13082, gai_strerror(ret), ret == 0); + return buffer; + } + + case AF_UNIX: return (addressSize > 2 ? as<sockaddr_un>().sun_path : "anonymous unix socket"); + case AF_UNSPEC: return "(NONE)"; + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return ""; + } } - bool localhost() const { return inet_addr( "127.0.0.1" ) == sa.sin_addr.s_addr; } + bool isLocalHost() const; bool operator==(const SockAddr& r) const { - return sa.sin_addr.s_addr == r.sa.sin_addr.s_addr && - sa.sin_port == r.sa.sin_port; + if (getType() != r.getType()) + return false; + + if (getPort() != r.getPort()) + return false; + + switch (getType()){ + case AF_INET: return as<sockaddr_in>().sin_addr.s_addr == r.as<sockaddr_in>().sin_addr.s_addr; + case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) == 0; + case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) == 0; + case AF_UNSPEC: return true; // assume all unspecified addresses are the same + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); + } } bool operator!=(const SockAddr& r) const { return !(*this == r); } bool operator<(const SockAddr& r) const { - if ( sa.sin_port >= r.sa.sin_port ) + if (getType() < r.getType()) + return true; + else if (getType() > r.getType()) return false; - return sa.sin_addr.s_addr < r.sa.sin_addr.s_addr; - } - }; - const int MaxMTU = 16384; + if (getPort() < r.getPort()) + return true; + else if (getPort() > r.getPort()) + return false; - class UDPConnection { - public: - UDPConnection() { - sock = 0; - } - ~UDPConnection() { - if ( sock ) { - closesocket(sock); - sock = 0; + switch (getType()){ + case AF_INET: return as<sockaddr_in>().sin_addr.s_addr < r.as<sockaddr_in>().sin_addr.s_addr; + case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) < 0; + case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) < 0; + case AF_UNSPEC: return false; + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); } } - bool init(const SockAddr& myAddr); - int recvfrom(char *buf, int len, SockAddr& sender); - int sendto(char *buf, int len, const SockAddr& EndPoint); - int mtu(const SockAddr& sa) { - return sa.isLocalHost() ? 16384 : 1480; - } - - SOCKET sock; - }; - - inline int UDPConnection::recvfrom(char *buf, int len, SockAddr& sender) { - return ::recvfrom(sock, buf, len, 0, (sockaddr *) &sender.sa, &sender.addressSize); - } - inline int UDPConnection::sendto(char *buf, int len, const SockAddr& EndPoint) { - if ( 0 && rand() < (RAND_MAX>>4) ) { - out() << " NOTSENT "; - // out() << curTimeMillis() << " .TEST: NOT SENDING PACKET" << endl; - return 0; - } - return ::sendto(sock, buf, len, 0, (sockaddr *) &EndPoint.sa, EndPoint.addressSize); - } + const sockaddr* raw() const {return (sockaddr*)&sa;} + sockaddr* raw() {return (sockaddr*)&sa;} - inline bool UDPConnection::init(const SockAddr& myAddr) { - sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if ( sock == INVALID_SOCKET ) { - out() << "invalid socket? " << OUTPUT_ERRNO << endl; - return false; - } - //out() << sizeof(sockaddr_in) << ' ' << myAddr.addressSize << endl; - if ( ::bind(sock, (sockaddr *) &myAddr.sa, myAddr.addressSize) != 0 ) { - out() << "udp init failed" << endl; - closesocket(sock); - sock = 0; - return false; - } - socklen_t optLen; - int rcvbuf; - if (getsockopt(sock, - SOL_SOCKET, - SO_RCVBUF, - (char*)&rcvbuf, - &optLen) != -1) - out() << "SO_RCVBUF:" << rcvbuf << endl; - return true; - } + socklen_t addressSize; + private: + struct sockaddr_storage sa; + }; - inline SockAddr::SockAddr(int sourcePort) { - memset(sa.sin_zero, 0, sizeof(sa.sin_zero)); - sa.sin_family = AF_INET; - sa.sin_port = htons(sourcePort); - sa.sin_addr.s_addr = htonl(INADDR_ANY); - addressSize = sizeof(sa); - } + extern SockAddr unknownAddress; // ( "0.0.0.0", 0 ) - inline SockAddr::SockAddr(const char * iporhost , int port) { - string ip = hostbyname( iporhost ); - memset(sa.sin_zero, 0, sizeof(sa.sin_zero)); - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = inet_addr(ip.c_str()); - addressSize = sizeof(sa); - } + const int MaxMTU = 16384; inline string getHostName() { char buf[256]; int ec = gethostname(buf, 127); if ( ec || *buf == 0 ) { - log() << "can't get this server's hostname " << OUTPUT_ERRNO << endl; + log() << "can't get this server's hostname " << errnoWithDescription() << endl; return ""; } return buf; } + string getHostNameCached(); + class ListeningSockets { public: - ListeningSockets() : _sockets( new set<int>() ){ + ListeningSockets() : _mutex("ListeningSockets"), _sockets( new set<int>() ){ } void add( int sock ){ diff --git a/util/stringutils.cpp b/util/stringutils.cpp new file mode 100644 index 0000000..3f989fd --- /dev/null +++ b/util/stringutils.cpp @@ -0,0 +1,44 @@ +// stringutils.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" + +namespace mongo { + + void splitStringDelim( const string& str , vector<string>* res , char delim ){ + if ( str.empty() ) + return; + + size_t beg = 0; + size_t pos = str.find( delim ); + while ( pos != string::npos ){ + res->push_back( str.substr( beg, pos - beg) ); + beg = ++pos; + pos = str.find( delim, beg ); + } + res->push_back( str.substr( beg ) ); + } + + void joinStringDelim( const vector<string>& strs , string* res , char delim ){ + for ( vector<string>::const_iterator it = strs.begin(); it != strs.end(); ++it ){ + if ( it !=strs.begin() ) res->push_back( delim ); + res->append( *it ); + } + } + +} // namespace mongo diff --git a/util/stringutils.h b/util/stringutils.h new file mode 100644 index 0000000..6b79c33 --- /dev/null +++ b/util/stringutils.h @@ -0,0 +1,43 @@ +// stringutils.h + +/* Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef UTIL_STRING_UTILS_HEADER +#define UTIL_STRING_UTILS_HEADER + +namespace mongo { + + void splitStringDelim( const string& str , vector<string>* res , char delim ); + + void joinStringDelim( const vector<string>& strs , string* res , char delim ); + + inline string tolowerString( const string& input ){ + string::size_type sz = input.size(); + + boost::scoped_array<char> line(new char[sz+1]); + char * copy = line.get(); + + for ( string::size_type i=0; i<sz; i++ ){ + char c = input[i]; + copy[i] = (char)tolower( (int)c ); + } + copy[sz] = 0; + return string(copy); + } + +} // namespace mongo + +#endif // UTIL_STRING_UTILS_HEADER diff --git a/util/text.cpp b/util/text.cpp new file mode 100644 index 0000000..f381e01 --- /dev/null +++ b/util/text.cpp @@ -0,0 +1,117 @@ +// text.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "text.h" +#include "unittest.h" + +namespace mongo{ + + inline int leadingOnes(unsigned char c){ + if (c < 0x80) return 0; + static const char _leadingOnes[128] = { + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x80 - 0x8F + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0x90 - 0x99 + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0xA0 - 0xA9 + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 0xB0 - 0xB9 + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, // 0xC0 - 0xC9 + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, // 0xD0 - 0xD9 + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, // 0xE0 - 0xE9 + 4, 4, 4, 4, 4, 4, 4, 4, // 0xF0 - 0xF7 + 5, 5, 5, 5, // 0xF8 - 0xFB + 6, 6, // 0xFC - 0xFD + 7, // 0xFE + 8, // 0xFF + }; + return _leadingOnes[c & 0x7f]; + + } + + bool isValidUTF8(const char *s){ + int left = 0; // how many bytes are left in the current codepoint + while (*s){ + const unsigned char c = (unsigned char) *(s++); + const int ones = leadingOnes(c); + if (left){ + if (ones != 1) return false; // should be a continuation byte + left--; + }else{ + if (ones == 0) continue; // ASCII byte + if (ones == 1) return false; // unexpected continuation byte + if (c > 0xF4) return false; // codepoint too large (< 0x10FFFF) + if (c == 0xC0 || c == 0xC1) return false; // codepoints <= 0x7F shouldn't be 2 bytes + + // still valid + left = ones-1; + } + } + if (left!=0) return false; // string ended mid-codepoint + return true; + } + + #if defined(_WIN32) + + std::string toUtf8String(const std::wstring& wide) + { + if (wide.size() > boost::integer_traits<int>::const_max) + throw std::length_error( + "Wide string cannot be more than INT_MAX characters long."); + if (wide.size() == 0) + return ""; + + // Calculate necessary buffer size + int len = ::WideCharToMultiByte( + CP_UTF8, 0, wide.c_str(), static_cast<int>(wide.size()), + NULL, 0, NULL, NULL); + + // Perform actual conversion + if (len > 0) + { + std::vector<char> buffer(len); + len = ::WideCharToMultiByte( + CP_UTF8, 0, wide.c_str(), static_cast<int>(wide.size()), + &buffer[0], static_cast<int>(buffer.size()), NULL, NULL); + if (len > 0) + { + assert(len == static_cast<int>(buffer.size())); + return std::string(&buffer[0], buffer.size()); + } + } + + throw boost::system::system_error( + ::GetLastError(), boost::system::system_category); + } + +#if defined(_UNICODE) + std::wstring toWideString(const char *s) { + std::basic_ostringstream<TCHAR> buf; + buf << s; + return buf.str(); + } +#endif + + #endif + + struct TextUnitTest : public UnitTest { + void run() { + assert( parseLL("123") == 123 ); + assert( parseLL("-123000000000") == -123000000000LL ); + } + } textUnitTest; + +} + diff --git a/util/text.h b/util/text.h new file mode 100644 index 0000000..4ba622f --- /dev/null +++ b/util/text.h @@ -0,0 +1,142 @@ +// text.h +/* + * Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace mongo { + + class StringSplitter { + public: + StringSplitter( const char * big , const char * splitter ) + : _big( big ) , _splitter( splitter ){ + } + + bool more(){ + return _big[0]; + } + + string next(){ + const char * foo = strstr( _big , _splitter ); + if ( foo ){ + string s( _big , foo - _big ); + _big = foo + 1; + while ( *_big && strstr( _big , _splitter ) == _big ) + _big++; + return s; + } + + string s = _big; + _big += strlen( _big ); + return s; + } + + void split( vector<string>& l ){ + while ( more() ){ + l.push_back( next() ); + } + } + + vector<string> split(){ + vector<string> l; + split( l ); + return l; + } + + static vector<string> split( const string& big , const string& splitter ){ + StringSplitter ss( big.c_str() , splitter.c_str() ); + return ss.split(); + } + + static string join( vector<string>& l , const string& split ){ + stringstream ss; + for ( unsigned i=0; i<l.size(); i++ ){ + if ( i > 0 ) + ss << split; + ss << l[i]; + } + return ss.str(); + } + + private: + const char * _big; + const char * _splitter; + }; + + /* This doesn't defend against ALL bad UTF8, but it will guarantee that the + * string can be converted to sequence of codepoints. However, it doesn't + * guarantee that the codepoints are valid. + */ + bool isValidUTF8(const char *s); + inline bool isValidUTF8(string s) { return isValidUTF8(s.c_str()); } + +#if defined(_WIN32) + + std::string toUtf8String(const std::wstring& wide); + + std::wstring toWideString(const char *s); + + /* like toWideString but UNICODE macro sensitive */ +# if !defined(_UNICODE) +#error temp error + inline std::string toNativeString(const char *s) { return s; } +# else + inline std::wstring toNativeString(const char *s) { return toWideString(s); } +# endif + +#endif + + // expect that n contains a base ten number and nothing else after it + // NOTE win version hasn't been tested directly + inline long long parseLL( const char *n ) { + long long ret; + uassert( 13307, "cannot convert empty string to long long", *n != 0 ); +#if !defined(_WIN32) + char *endPtr = 0; + errno = 0; + ret = strtoll( n, &endPtr, 10 ); + uassert( 13305, "could not convert string to long long", *endPtr == 0 && errno == 0 ); +#elif _MSC_VER>=1600 // 1600 is VS2k10 1500 is VS2k8 + size_t endLen = 0; + try { + ret = stoll( n, &endLen, 10 ); + } catch ( ... ) { + endLen = 0; + } + uassert( 13306, "could not convert string to long long", endLen != 0 && n[ endLen ] == 0 ); +#else // stoll() wasn't introduced until VS 2010. + char* endPtr = 0; + ret = _strtoi64( n, &endPtr, 10 ); + uassert( 13310, "could not convert string to long long", (*endPtr == 0) && (ret != _I64_MAX) && (ret != _I64_MIN) ); +#endif // !defined(_WIN32) + return ret; + } +} diff --git a/util/thread_pool.cpp b/util/thread_pool.cpp deleted file mode 100644 index 77d0d05..0000000 --- a/util/thread_pool.cpp +++ /dev/null @@ -1,139 +0,0 @@ -/* threadpool.cpp -*/ - -/* Copyright 2009 10gen Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "stdafx.h" -#include "thread_pool.h" -#include "mvar.h" - - -namespace mongo{ -namespace threadpool{ - -// Worker thread -class Worker : boost::noncopyable { -public: - explicit Worker(ThreadPool& owner) - : _owner(owner) - , _is_done(true) - , _thread(boost::bind(&Worker::loop, this)) - {} - - // destructor will block until current operation is completed - // Acts as a "join" on this thread - ~Worker(){ - _task.put(Task()); - _thread.join(); - } - - void set_task(Task& func){ - assert(!func.empty()); - assert(_is_done); - _is_done = false; - - _task.put(func); - } - - private: - ThreadPool& _owner; - MVar<Task> _task; - bool _is_done; // only used for error detection - boost::thread _thread; - - void loop(){ - while (true) { - Task task = _task.take(); - if (task.empty()) - break; // ends the thread - - try { - task(); - } catch (std::exception e){ - log() << "Unhandled exception in worker thread: " << e.what() << endl;; - } catch (...){ - log() << "Unhandled non-exception in worker thread" << endl; - } - _is_done = true; - _owner.task_done(this); - } - } -}; - -ThreadPool::ThreadPool(int nThreads) - : _tasksRemaining(0) - , _nThreads(nThreads) -{ - scoped_lock lock(_mutex); - while (nThreads-- > 0){ - Worker* worker = new Worker(*this); - _freeWorkers.push_front(worker); - } -} - -ThreadPool::~ThreadPool(){ - join(); - - assert(_tasks.empty()); - - // O(n) but n should be small - assert(_freeWorkers.size() == (unsigned)_nThreads); - - while(!_freeWorkers.empty()){ - delete _freeWorkers.front(); - _freeWorkers.pop_front(); - } -} - -void ThreadPool::join(){ - scoped_lock lock(_mutex); - while(_tasksRemaining){ - _condition.wait(lock.boost()); - } -} - -void ThreadPool::schedule(Task task){ - scoped_lock lock(_mutex); - - _tasksRemaining++; - - if (!_freeWorkers.empty()){ - _freeWorkers.front()->set_task(task); - _freeWorkers.pop_front(); - }else{ - _tasks.push_back(task); - } -} - -// should only be called by a worker from the worker thread -void ThreadPool::task_done(Worker* worker){ - scoped_lock lock(_mutex); - - if (!_tasks.empty()){ - worker->set_task(_tasks.front()); - _tasks.pop_front(); - }else{ - _freeWorkers.push_front(worker); - } - - _tasksRemaining--; - - if(_tasksRemaining == 0) - _condition.notify_all(); -} - -} //namespace threadpool -} //namespace mongo diff --git a/util/util.cpp b/util/util.cpp index 8ae00f3..02abfa9 100644 --- a/util/util.cpp +++ b/util/util.cpp @@ -1,4 +1,4 @@ -// util.cpp +// @file util.cpp /* Copyright 2009 10gen Inc. * @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "goodies.h" #include "unittest.h" #include "file_allocator.h" @@ -23,18 +23,71 @@ namespace mongo { - vector<UnitTest*> *UnitTest::tests = 0; - bool UnitTest::running = false; + boost::thread_specific_ptr<string> _threadName; + + void _setThreadName( const char * name ){ + static int N = 0; + if ( strcmp( name , "conn" ) == 0 ){ + stringstream ss; + ss << name << ++N; + _threadName.reset( new string( ss.str() ) ); + } + else { + _threadName.reset( new string(name) ); + } + } + +#if defined(_WIN32) +#define MS_VC_EXCEPTION 0x406D1388 +#pragma pack(push,8) + typedef struct tagTHREADNAME_INFO + { + DWORD dwType; // Must be 0x1000. + LPCSTR szName; // Pointer to name (in user addr space). + DWORD dwThreadID; // Thread ID (-1=caller thread). + DWORD dwFlags; // Reserved for future use, must be zero. + } THREADNAME_INFO; +#pragma pack(pop) + + void setThreadName(const char *name) + { + _setThreadName( name ); + Sleep(10); + THREADNAME_INFO info; + info.dwType = 0x1000; + info.szName = name; + info.dwThreadID = -1; + info.dwFlags = 0; + __try + { + RaiseException( MS_VC_EXCEPTION, 0, sizeof(info)/sizeof(ULONG_PTR), (ULONG_PTR*)&info ); + } + __except(EXCEPTION_EXECUTE_HANDLER) + { + } + } +#else + void setThreadName(const char * name ) { + _setThreadName( name ); + } +#endif - Nullstream nullstream; + string getThreadName(){ + string * s = _threadName.get(); + if ( s ) + return *s; + return ""; + } - thread_specific_ptr<Logstream> Logstream::tsp; + vector<UnitTest*> *UnitTest::tests = 0; + bool UnitTest::running = false; const char *default_getcurns() { return ""; } const char * (*getcurns)() = default_getcurns; int logLevel = 0; - mongo::mutex Logstream::mutex; + int tlogLevel = 0; + mongo::mutex Logstream::mutex("Logstream"); int Logstream::doneSetup = Logstream::magicNumber(); bool goingAway = false; @@ -106,35 +159,21 @@ namespace mongo { void rawOut( const string &s ) { if( s.empty() ) return; - char now[64]; - time_t_to_String(time(0), now); - now[20] = 0; -#if defined(_WIN32) - (std::cout << now << " " << s).flush(); -#else - write( STDOUT_FILENO, now, 20 ); - write( STDOUT_FILENO, " ", 1 ); - write( STDOUT_FILENO, s.c_str(), s.length() ); - fsync( STDOUT_FILENO ); -#endif - } -#ifndef _SCONS - // only works in scons - const char * gitVersion(){ return ""; } - const char * sysInfo(){ return ""; } -#endif + boost::scoped_array<char> buf_holder(new char[32 + s.size()]); + char * buf = buf_holder.get(); + + time_t_to_String( time(0) , buf ); + buf[20] = ' '; + strncpy( buf + 21 , s.c_str() , s.size() ); + buf[21+s.size()] = '\n'; + buf[21+s.size()+1] = 0; - void printGitVersion() { log() << "git version: " << gitVersion() << endl; } - void printSysInfo() { log() << "sys info: " << sysInfo() << endl; } - string mongodVersion() { - stringstream ss; - ss << "db version v" << versionString << ", pdfile version " << VERSION << "." << VERSION_MINOR; - return ss.str(); + Logstream::logLockless( buf ); } ostream& operator<<( ostream &s, const ThreadSafeString &o ){ - s << (string)o; + s << o.toString(); return s; } diff --git a/util/version.cpp b/util/version.cpp new file mode 100644 index 0000000..c49c064 --- /dev/null +++ b/util/version.cpp @@ -0,0 +1,86 @@ +#include "pch.h" + +#include <cstdlib> +#include <iostream> +#include <iomanip> +#include <sstream> +#include <string> + +#include "version.h" + +namespace mongo { + + // + // mongo processes version support + // + + const char versionString[] = "1.6.0"; + + string mongodVersion() { + stringstream ss; + ss << "db version v" << versionString << ", pdfile version " << VERSION << "." << VERSION_MINOR; + return ss.str(); + } + + // + // git version support + // + +#ifndef _SCONS + // only works in scons + const char * gitVersion(){ return "not-scons"; } +#endif + + void printGitVersion() { log() << "git version: " << gitVersion() << endl; } + + // + // sys info support + // + +#ifndef _SCONS +#if defined(_WIN32) + string sysInfo(){ + stringstream ss; + ss << "not-scons win"; + ss << " mscver:" << _MSC_FULL_VER << " built:" << __DATE__; + ss << " boostver:" << BOOST_VERSION; +#if( !defined(_MT) ) +#error _MT is not defined +#endif + ss << (sizeof(char *) == 8) ? " 64bit" : " 32bit"; + return ss.str(); + } +#else + string sysInfo(){ return ""; } +#endif +#endif + + void printSysInfo() { log() << "sys info: " << sysInfo() << endl; } + + // + // 32 bit systems warning + // + + void show_32_warning(){ + bool warned = false; + { + const char * foo = strchr( versionString , '.' ) + 1; + int bar = atoi( foo ); + if ( ( 2 * ( bar / 2 ) ) != bar ) { + cout << "\n** NOTE: This is a development version (" << versionString << ") of MongoDB."; + cout << "\n** Not recommended for production. \n" << endl; + warned = true; + } + } + + if ( sizeof(int*) != 4 ) + return; + + if( !warned ) // prettier this way + cout << endl; + cout << "** NOTE: when using MongoDB 32 bit, you are limited to about 2 gigabytes of data" << endl; + cout << "** see http://blog.mongodb.org/post/137788967/32-bit-limitations" << endl; + cout << endl; + } + +} diff --git a/util/version.h b/util/version.h new file mode 100644 index 0000000..70ddeb8 --- /dev/null +++ b/util/version.h @@ -0,0 +1,24 @@ +#ifndef UTIL_VERSION_HEADER +#define UTIL_VERSION_HEADER + +#include <string> + +namespace mongo { + + using std::string; + + // mongo version + extern const char versionString[]; + string mongodVersion(); + + const char * gitVersion(); + void printGitVersion(); + + string sysInfo(); + void printSysInfo(); + + void show_32_warning(); + +} // namespace mongo + +#endif // UTIL_VERSION_HEADER diff --git a/util/winutil.h b/util/winutil.h new file mode 100644 index 0000000..b69b69a --- /dev/null +++ b/util/winutil.h @@ -0,0 +1,44 @@ +// @file winutil.cpp : Windows related utility functions +// +// /** +// * Copyright (C) 2008 10gen Inc. +// * +// * This program is free software: you can redistribute it and/or modify +// * it under the terms of the GNU Affero General Public License, version 3, +// * as published by the Free Software Foundation. +// * +// * This program is distributed in the hope that it will be useful, +// * but WITHOUT ANY WARRANTY; without even the implied warranty of +// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// * GNU Affero General Public License for more details. +// * +// * You should have received a copy of the GNU Affero General Public License +// * along with this program. If not, see <http://www.gnu.org/licenses/>. +// */ +// +// #include "pch.h" + +#pragma once + +#if defined(_WIN32) +#include <windows.h> +#include "text.h" + +namespace mongo { + + inline string GetWinErrMsg(DWORD err) { + LPTSTR errMsg; + ::FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, (LPTSTR)&errMsg, 0, NULL ); + std::string errMsgStr = toUtf8String( errMsg ); + ::LocalFree( errMsg ); + // FormatMessage() appends a newline to the end of error messages, we trim it because endl flushes the buffer. + errMsgStr = errMsgStr.erase( errMsgStr.length() - 2 ); + std::ostringstream output; + output << errMsgStr << " (" << err << ")"; + + return output.str(); + } +} + +#endif + |