diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
commit | 7645618fd3914cb8a20561625913c20d49504a49 (patch) | |
tree | 8370f846f58f6d71165b7a0e2eda04648584ec76 /s/server.cpp | |
parent | 68c73c3c7608b4c87f07440dc3232801720b1168 (diff) | |
download | mongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz |
Imported Upstream version 1.6.0
Diffstat (limited to 's/server.cpp')
-rw-r--r-- | s/server.cpp | 128 |
1 files changed, 92 insertions, 36 deletions
diff --git a/s/server.cpp b/s/server.cpp index 6141816..5c6ac9b 100644 --- a/s/server.cpp +++ b/s/server.cpp @@ -16,26 +16,30 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "stdafx.h" +#include "pch.h" #include "../util/message.h" #include "../util/unittest.h" #include "../client/connpool.h" #include "../util/message_server.h" +#include "../util/stringutils.h" +#include "../util/version.h" +#include "../db/dbwebserver.h" #include "server.h" #include "request.h" #include "config.h" #include "chunk.h" +#include "balance.h" +#include "grid.h" +#include "cursors.h" namespace mongo { - + CmdLine cmdLine; Database *database = 0; string mongosCommand; - string ourHostname; - OID serverID; bool dbexitCalled = false; - + bool inShutdown(){ return dbexitCalled; } @@ -60,7 +64,8 @@ namespace mongo { class ShardingConnectionHook : public DBConnectionHook { public: virtual void onCreate( DBClientBase * conn ){ - conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" ); + if ( conn->type() != ConnectionString::SYNC ) + conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" ); } virtual void onHandedOut( DBClientBase * conn ){ ClientInfo::get()->addShard( conn->getServerAddress() ); @@ -70,47 +75,66 @@ namespace mongo { class ShardedMessageHandler : public MessageHandler { public: virtual ~ShardedMessageHandler(){} + virtual void process( Message& m , AbstractMessagingPort* p ){ + assert( p ); Request r( m , p ); + + LastError * le = lastError.startRequest( m , r.getClientId() ); + assert( le ); + if ( logLevel > 5 ){ log(5) << "client id: " << hex << r.getClientId() << "\t" << r.getns() << "\t" << dec << r.op() << endl; } try { + r.init(); setClientId( r.getClientId() ); r.process(); } catch ( DBException& e ){ - m.data->id = r.id(); + le->raiseError( e.getCode() , e.what() ); + + m.header()->id = r.id(); log() << "UserException: " << e.what() << endl; if ( r.expectResponse() ){ - BSONObj err = BSON( "$err" << e.what() ); - replyToQuery( QueryResult::ResultFlag_ErrSet, p , m , err ); + BSONObj err = BSON( "$err" << e.what() << "code" << e.getCode() ); + replyToQuery( ResultFlag_ErrSet, p , m , err ); } } } + + virtual void disconnected( AbstractMessagingPort* p ){ + ClientInfo::disconnect( p->getClientId() ); + lastError.disconnect( p->getClientId() ); + } }; void sighandler(int sig){ - dbexit(EXIT_CLEAN, (string("recieved signal ") + BSONObjBuilder::numStr(sig)).c_str()); + dbexit(EXIT_CLEAN, (string("received signal ") + BSONObjBuilder::numStr(sig)).c_str()); } void setupSignals(){ - // needed for cmdLine, btu we do it in init() + signal(SIGTERM, sighandler); + signal(SIGINT, sighandler); } void init(){ serverID.init(); setupSIGTRAPforGDB(); - signal(SIGTERM, sighandler); - signal(SIGINT, sighandler); + setupCoreSignals(); + setupSignals(); } - void start() { + void start( const MessageServer::Options& opts ){ + balancer.go(); + cursorCache.startTimeoutThread(); + log() << "waiting for connections on port " << cmdLine.port << endl; //DbGridListener l(port); //l.listen(); ShardedMessageHandler handler; - MessageServer * server = createServer( cmdLine.port , &handler ); + MessageServer * server = createServer( opts , &handler ); + server->setAsTimeTracker(); server->run(); } @@ -120,7 +144,7 @@ namespace mongo { } void printShardingVersionInfo(){ - log() << mongosCommand << " v0.3 (alpha 3) starting (--help for usage)" << endl; + log() << mongosCommand << " " << mongodVersion() << " starting (--help for usage)" << endl; printGitVersion(); printSysInfo(); } @@ -146,8 +170,11 @@ int main(int argc, char* argv[], char *envp[] ) { options.add_options() ( "configdb" , po::value<string>() , "1 or 3 comma separated config servers" ) ( "test" , "just run unit tests" ) + ( "upgrade" , "upgrade meta data version" ) + ( "chunkSize" , po::value<int>(), "maximum amount of data per chunk" ) + ( "ipv6", "enable IPv6 support (disabled by default)" ) ; - + // parse options po::variables_map params; @@ -164,6 +191,13 @@ int main(int argc, char* argv[], char *envp[] ) { return 0; } + if ( params.count( "chunkSize" ) ){ + Chunk::MaxChunkSize = params["chunkSize"].as<int>() * 1024 * 1024; + } + + if ( params.count( "ipv6" ) ){ + enableIPv6(); + } if ( params.count( "test" ) ){ logLevel = 5; @@ -178,23 +212,33 @@ int main(int argc, char* argv[], char *envp[] ) { } vector<string> configdbs; - { - string s = params["configdb"].as<string>(); - while ( true ){ - size_t idx = s.find( ',' ); - if ( idx == string::npos ){ - configdbs.push_back( s ); - break; - } - configdbs.push_back( s.substr( 0 , idx ) ); - s = s.substr( idx + 1 ); - } - } - + splitStringDelim( params["configdb"].as<string>() , &configdbs , ',' ); if ( configdbs.size() != 1 && configdbs.size() != 3 ){ out() << "need either 1 or 3 configdbs" << endl; return 5; } + + // we either have a seeting were all process are in localhost or none is + for ( vector<string>::const_iterator it = configdbs.begin() ; it != configdbs.end() ; ++it ){ + try { + + HostAndPort configAddr( *it ); // will throw if address format is invalid + + if ( it == configdbs.begin() ){ + grid.setAllowLocalHost( configAddr.isLocalHost() ); + } + + if ( configAddr.isLocalHost() != grid.allowLocalHost() ){ + out() << "cannot mix localhost and ip addresses in configdbs" << endl; + return 10; + } + + } + catch ( DBException& e) { + out() << "configdb: " << e.what() << endl; + return 9; + } + } pool.addHook( &shardingConnectionHook ); @@ -213,24 +257,36 @@ int main(int argc, char* argv[], char *envp[] ) { printShardingVersionInfo(); if ( ! configServer.init( configdbs ) ){ - cout << "couldn't connectd to config db" << endl; + cout << "couldn't resolve config db address" << endl; return 7; } - if ( ! configServer.ok() ){ + if ( ! configServer.ok( true ) ){ cout << "configServer startup check failed" << endl; return 8; } - int configError = configServer.checkConfigVersion(); + int configError = configServer.checkConfigVersion( params.count( "upgrade" ) ); if ( configError ){ - cout << "config server error: " << configError << endl; + if ( configError > 0 ){ + cout << "upgrade success!" << endl; + } + else { + cout << "config server error: " << configError << endl; + } return configError; } configServer.reloadSettings(); - + init(); - start(); + + boost::thread web( webServerThread ); + + MessageServer::Options opts; + opts.port = cmdLine.port; + opts.ipList = cmdLine.bind_ip; + start(opts); + dbexit( EXIT_CLEAN ); return 0; } |