diff options
Diffstat (limited to 'util/message.cpp')
| -rw-r--r-- | util/message.cpp | 764 |
1 files changed, 0 insertions, 764 deletions
diff --git a/util/message.cpp b/util/message.cpp deleted file mode 100644 index bcb1772..0000000 --- a/util/message.cpp +++ /dev/null @@ -1,764 +0,0 @@ -/* message - - todo: authenticate; encrypt? -*/ - -/* Copyright 2009 10gen Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "pch.h" -#include "message.h" -#include <time.h> -#include "../util/goodies.h" -#include "../util/background.h" -#include <fcntl.h> -#include <errno.h> -#include "../db/cmdline.h" -#include "../client/dbclient.h" -#include "../util/time_support.h" - -#ifndef _WIN32 -# ifndef __sunos__ -# include <ifaddrs.h> -# endif -# include <sys/resource.h> -# include <sys/stat.h> -#else - -// errno doesn't work for winsock. -#undef errno -#define errno WSAGetLastError() - -#endif - -namespace mongo { - - bool noUnixSocket = false; - - bool objcheck = false; - - void checkTicketNumbers(); - -// if you want trace output: -#define mmm(x) - -#ifdef MSG_NOSIGNAL - const int portSendFlags = MSG_NOSIGNAL; - const int portRecvFlags = MSG_NOSIGNAL; -#else - const int portSendFlags = 0; - const int portRecvFlags = 0; -#endif - - const Listener* Listener::_timeTracker; - - string SocketException::toString() const { - stringstream ss; - ss << _ei.code << " socket exception [" << _type << "] "; - - if ( _server.size() ) - ss << "server [" << _server << "] "; - - if ( _extra.size() ) - ss << _extra; - - return ss.str(); - } - - - vector<SockAddr> ipToAddrs(const char* ips, int port) { - vector<SockAddr> out; - if (*ips == '\0') { - out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all - - if (IPv6Enabled()) - out.push_back(SockAddr("::", port)); // IPv6 all -#ifndef _WIN32 - if (!noUnixSocket) - out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket -#endif - return out; - } - - while(*ips) { - string ip; - const char * comma = strchr(ips, ','); - if (comma) { - ip = string(ips, comma - ips); - ips = comma + 1; - } - else { - ip = string(ips); - ips = ""; - } - - SockAddr sa(ip.c_str(), port); - out.push_back(sa); - -#ifndef _WIN32 - if (!noUnixSocket && (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4 - out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); -#endif - } - return out; - - } - - /* listener ------------------------------------------------------------------- */ - - void Listener::initAndListen() { - checkTicketNumbers(); - vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port); - vector<int> socks; - SOCKET maxfd = 0; // needed for select() - - for (vector<SockAddr>::iterator it=mine.begin(), end=mine.end(); it != end; ++it) { - SockAddr& me = *it; - - SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0); - if ( sock == INVALID_SOCKET ) { - log() << "ERROR: listen(): invalid socket? " << errnoWithDescription() << endl; - } - - if (me.getType() == AF_UNIX) { -#if !defined(_WIN32) - if (unlink(me.getAddr().c_str()) == -1) { - int x = errno; - if (x != ENOENT) { - log() << "couldn't unlink socket file " << me << errnoWithDescription(x) << " skipping" << endl; - continue; - } - } -#endif - } - else if (me.getType() == AF_INET6) { - // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1) - // That causes a conflict if we don't do set it to IPV6_ONLY - const int one = 1; - setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &one, sizeof(one)); - } - - prebindOptions( sock ); - - if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) { - int x = errno; - log() << "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl; - if ( x == EADDRINUSE ) - log() << " addr already in use" << endl; - closesocket(sock); - return; - } - -#if !defined(_WIN32) - if (me.getType() == AF_UNIX) { - if (chmod(me.getAddr().c_str(), 0777) == -1) { - log() << "couldn't chmod socket file " << me << errnoWithDescription() << endl; - } - - ListeningSockets::get()->addPath( me.getAddr() ); - } -#endif - - if ( ::listen(sock, 128) != 0 ) { - log() << "listen(): listen() failed " << errnoWithDescription() << endl; - closesocket(sock); - return; - } - - ListeningSockets::get()->add( sock ); - - socks.push_back(sock); - if (sock > maxfd) - maxfd = sock; - } - - static long connNumber = 0; - struct timeval maxSelectTime; - while ( ! inShutdown() ) { - fd_set fds[1]; - FD_ZERO(fds); - - for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) { - FD_SET(*it, fds); - } - - maxSelectTime.tv_sec = 0; - maxSelectTime.tv_usec = 10000; - const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime); - - if (ret == 0) { -#if defined(__linux__) - _elapsedTime += ( 10000 - maxSelectTime.tv_usec ) / 1000; -#else - _elapsedTime += 10; -#endif - continue; - } - _elapsedTime += ret; // assume 1ms to grab connection. very rough - - if (ret < 0) { - int x = errno; -#ifdef EINTR - if ( x == EINTR ) { - log() << "select() signal caught, continuing" << endl; - continue; - } -#endif - if ( ! inShutdown() ) - log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl; - return; - } - - for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) { - if (! (FD_ISSET(*it, fds))) - continue; - - SockAddr from; - int s = accept(*it, from.raw(), &from.addressSize); - if ( s < 0 ) { - int x = errno; // so no global issues - if ( x == ECONNABORTED || x == EBADF ) { - log() << "Listener on port " << _port << " aborted" << endl; - return; - } - if ( x == 0 && inShutdown() ) { - return; // socket closed - } - if( !inShutdown() ) - log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl; - continue; - } - if (from.getType() != AF_UNIX) - disableNagle(s); - if ( _logConnect && ! cmdLine.quiet ) - log() << "connection accepted from " << from.toString() << " #" << ++connNumber << endl; - accepted(s, from); - } - } - } - - void Listener::accepted(int sock, const SockAddr& from) { - accepted( new MessagingPort(sock, from) ); - } - - /* messagingport -------------------------------------------------------------- */ - - class PiggyBackData { - public: - PiggyBackData( MessagingPort * port ) { - _port = port; - _buf = new char[1300]; - _cur = _buf; - } - - ~PiggyBackData() { - DESTRUCTOR_GUARD ( - flush(); - delete[]( _cur ); - ); - } - - void append( Message& m ) { - assert( m.header()->len <= 1300 ); - - if ( len() + m.header()->len > 1300 ) - flush(); - - memcpy( _cur , m.singleData() , m.header()->len ); - _cur += m.header()->len; - } - - void flush() { - if ( _buf == _cur ) - return; - - _port->send( _buf , len(), "flush" ); - _cur = _buf; - } - - int len() const { return _cur - _buf; } - - private: - MessagingPort* _port; - char * _buf; - char * _cur; - }; - - class Ports { - set<MessagingPort*> ports; - mongo::mutex m; - public: - Ports() : ports(), m("Ports") {} - void closeAll(unsigned skip_mask) { - scoped_lock bl(m); - for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) { - if( (*i)->tag & skip_mask ) - continue; - (*i)->shutdown(); - } - } - void insert(MessagingPort* p) { - scoped_lock bl(m); - ports.insert(p); - } - void erase(MessagingPort* p) { - scoped_lock bl(m); - ports.erase(p); - } - }; - - // we "new" this so it is still be around when other automatic global vars - // are being destructed during termination. - Ports& ports = *(new Ports()); - - void MessagingPort::closeAllSockets(unsigned mask) { - ports.closeAll(mask); - } - - MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), _bytesIn(0), _bytesOut(0), farEnd(_far), _timeout(), tag(0) { - _logLevel = 0; - ports.insert(this); - } - - MessagingPort::MessagingPort( double timeout, int ll ) : _bytesIn(0), _bytesOut(0), tag(0) { - _logLevel = ll; - ports.insert(this); - sock = -1; - piggyBackData = 0; - _timeout = timeout; - } - - void MessagingPort::shutdown() { - if ( sock >= 0 ) { - closesocket(sock); - sock = -1; - } - } - - MessagingPort::~MessagingPort() { - if ( piggyBackData ) - delete( piggyBackData ); - shutdown(); - ports.erase(this); - } - - class ConnectBG : public BackgroundJob { - public: - ConnectBG(int sock, SockAddr farEnd) : _sock(sock), _farEnd(farEnd) { } - - void run() { _res = ::connect(_sock, _farEnd.raw(), _farEnd.addressSize); } - string name() const { return "ConnectBG"; } - int inError() const { return _res; } - - private: - int _sock; - int _res; - SockAddr _farEnd; - }; - - bool MessagingPort::connect(SockAddr& _far) { - farEnd = _far; - - sock = socket(farEnd.getType(), SOCK_STREAM, 0); - if ( sock == INVALID_SOCKET ) { - log(_logLevel) << "ERROR: connect invalid socket " << errnoWithDescription() << endl; - return false; - } - - if ( _timeout > 0 ) { - setSockTimeouts( sock, _timeout ); - } - - ConnectBG bg(sock, farEnd); - bg.go(); - if ( bg.wait(5000) ) { - if ( bg.inError() ) { - closesocket(sock); - sock = -1; - return false; - } - } - else { - // time out the connect - closesocket(sock); - sock = -1; - bg.wait(); // so bg stays in scope until bg thread terminates - return false; - } - - if (farEnd.getType() != AF_UNIX) - disableNagle(sock); - -#ifdef SO_NOSIGPIPE - // osx - const int one = 1; - setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int)); -#endif - - /* - // SO_LINGER is bad - #ifdef SO_LINGER - struct linger ling; - ling.l_onoff = 1; - ling.l_linger = 0; - setsockopt(sock, SOL_SOCKET, SO_LINGER, (char *) &ling, sizeof(ling)); - #endif - */ - return true; - } - - bool MessagingPort::recv(Message& m) { - try { -again: - mmm( log() << "* recv() sock:" << this->sock << endl; ) - int len = -1; - - char *lenbuf = (char *) &len; - int lft = 4; - recv( lenbuf, lft ); - - if ( len < 16 || len > 48000000 ) { // messages must be large enough for headers - if ( len == -1 ) { - // Endian check from the client, after connecting, to see what mode server is running in. - unsigned foo = 0x10203040; - send( (char *) &foo, 4, "endian" ); - goto again; - } - - if ( len == 542393671 ) { - // an http GET - log(_logLevel) << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl; - string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n"; - stringstream ss; - ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; - string s = ss.str(); - send( s.c_str(), s.size(), "http" ); - return false; - } - log(0) << "recv(): message len " << len << " is too large" << len << endl; - return false; - } - - int z = (len+1023)&0xfffffc00; - assert(z>=len); - MsgData *md = (MsgData *) malloc(z); - assert(md); - md->len = len; - - char *p = (char *) &md->id; - int left = len -4; - - try { - recv( p, left ); - } - catch (...) { - free(md); - throw; - } - - _bytesIn += len; - m.setData(md, true); - return true; - - } - catch ( const SocketException & e ) { - log(_logLevel + (e.shouldPrint() ? 0 : 1) ) << "SocketException: remote: " << remote() << " error: " << e << endl; - m.reset(); - return false; - } - } - - void MessagingPort::reply(Message& received, Message& response) { - say(/*received.from, */response, received.header()->id); - } - - void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) { - say(/*received.from, */response, responseTo); - } - - bool MessagingPort::call(Message& toSend, Message& response) { - mmm( log() << "*call()" << endl; ) - say(toSend); - return recv( toSend , response ); - } - - bool MessagingPort::recv( const Message& toSend , Message& response ) { - while ( 1 ) { - bool ok = recv(response); - if ( !ok ) - return false; - //log() << "got response: " << response.data->responseTo << endl; - if ( response.header()->responseTo == toSend.header()->id ) - break; - error() << "MessagingPort::call() wrong id got:" << hex << (unsigned)response.header()->responseTo << " expect:" << (unsigned)toSend.header()->id << '\n' - << dec - << " toSend op: " << (unsigned)toSend.operation() << '\n' - << " response msgid:" << (unsigned)response.header()->id << '\n' - << " response len: " << (unsigned)response.header()->len << '\n' - << " response op: " << response.operation() << '\n' - << " farEnd: " << farEnd << endl; - assert(false); - response.reset(); - } - mmm( log() << "*call() end" << endl; ) - return true; - } - - void MessagingPort::say(Message& toSend, int responseTo) { - assert( !toSend.empty() ); - mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) - toSend.header()->id = nextMessageId(); - toSend.header()->responseTo = responseTo; - - if ( piggyBackData && piggyBackData->len() ) { - mmm( log() << "* have piggy back" << endl; ) - if ( ( piggyBackData->len() + toSend.header()->len ) > 1300 ) { - // won't fit in a packet - so just send it off - piggyBackData->flush(); - } - else { - piggyBackData->append( toSend ); - piggyBackData->flush(); - return; - } - } - - toSend.send( *this, "say" ); - } - - // sends all data or throws an exception - void MessagingPort::send( const char * data , int len, const char *context ) { - _bytesOut += len; - while( len > 0 ) { - int ret = ::send( sock , data , len , portSendFlags ); - if ( ret == -1 ) { - if ( errno != EAGAIN || _timeout == 0 ) { - SocketException::Type t = SocketException::SEND_ERROR; -#if defined(_WINDOWS) - if( e == WSAETIMEDOUT ) t = SocketException::SEND_TIMEOUT; -#endif - log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl; - throw SocketException( t ); - } - else { - if ( !serverAlive( farEnd.toString() ) ) { - log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl; - throw SocketException( SocketException::SEND_ERROR ); - } - } - } - else { - assert( ret <= len ); - len -= ret; - data += ret; - } - } - } - - // sends all data or throws an exception - void MessagingPort::send( const vector< pair< char *, int > > &data, const char *context ) { -#if defined(_WIN32) - // TODO use scatter/gather api - for( vector< pair< char *, int > >::const_iterator i = data.begin(); i != data.end(); ++i ) { - char * data = i->first; - int len = i->second; - send( data, len, context ); - } -#else - vector< struct iovec > d( data.size() ); - int i = 0; - for( vector< pair< char *, int > >::const_iterator j = data.begin(); j != data.end(); ++j ) { - if ( j->second > 0 ) { - d[ i ].iov_base = j->first; - d[ i ].iov_len = j->second; - ++i; - } - } - struct msghdr meta; - memset( &meta, 0, sizeof( meta ) ); - meta.msg_iov = &d[ 0 ]; - meta.msg_iovlen = d.size(); - - while( meta.msg_iovlen > 0 ) { - int ret = ::sendmsg( sock , &meta , portSendFlags ); - if ( ret == -1 ) { - if ( errno != EAGAIN || _timeout == 0 ) { - log(_logLevel) << "MessagingPort " << context << " send() " << errnoWithDescription() << ' ' << farEnd.toString() << endl; - throw SocketException( SocketException::SEND_ERROR ); - } - else { - if ( !serverAlive( farEnd.toString() ) ) { - log(_logLevel) << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl; - throw SocketException( SocketException::SEND_ERROR ); - } - } - } - else { - struct iovec *& i = meta.msg_iov; - while( ret > 0 ) { - if ( i->iov_len > unsigned( ret ) ) { - i->iov_len -= ret; - i->iov_base = (char*)(i->iov_base) + ret; - ret = 0; - } - else { - ret -= i->iov_len; - ++i; - --(meta.msg_iovlen); - } - } - } - } -#endif - } - - void MessagingPort::recv( char * buf , int len ) { - unsigned retries = 0; - while( len > 0 ) { - int ret = ::recv( sock , buf , len , portRecvFlags ); - if ( ret > 0 ) { - if ( len <= 4 && ret != len ) - log(_logLevel) << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl; - assert( ret <= len ); - len -= ret; - buf += ret; - } - else if ( ret == 0 ) { - log(3) << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; - throw SocketException( SocketException::CLOSED ); - } - else { /* ret < 0 */ - int e = errno; - -#if defined(EINTR) && !defined(_WIN32) - if( e == EINTR ) { - if( ++retries == 1 ) { - log() << "EINTR retry" << endl; - continue; - } - } -#endif - if ( ( e == EAGAIN -#ifdef _WINDOWS - || e == WSAETIMEDOUT -#endif - ) && _timeout > 0 ) { - // this is a timeout - log(_logLevel) << "MessagingPort recv() timeout " << farEnd.toString() <<endl; - throw SocketException(SocketException::RECV_TIMEOUT); - } - - log(_logLevel) << "MessagingPort recv() " << errnoWithDescription(e) << " " << farEnd.toString() <<endl; - throw SocketException(SocketException::RECV_ERROR); - } - } - } - - int MessagingPort::unsafe_recv( char *buf, int max ) { - return ::recv( sock , buf , max , portRecvFlags ); - } - - void MessagingPort::piggyBack( Message& toSend , int responseTo ) { - - if ( toSend.header()->len > 1300 ) { - // not worth saving because its almost an entire packet - say( toSend ); - return; - } - - // we're going to be storing this, so need to set it up - toSend.header()->id = nextMessageId(); - toSend.header()->responseTo = responseTo; - - if ( ! piggyBackData ) - piggyBackData = new PiggyBackData( this ); - - piggyBackData->append( toSend ); - } - - unsigned MessagingPort::remotePort() const { - return farEnd.getPort(); - } - - HostAndPort MessagingPort::remote() const { - if ( _farEndParsed.port() == -1 ) - _farEndParsed = HostAndPort( farEnd ); - return _farEndParsed; - } - - - MSGID NextMsgId; - - struct MsgStart { - MsgStart() { - NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); - assert(MsgDataHeaderSize == 16); - } - } msgstart; - - MSGID nextMessageId() { - MSGID msgid = NextMsgId++; - return msgid; - } - - bool doesOpGetAResponse( int op ) { - return op == dbQuery || op == dbGetMore; - } - - const int DEFAULT_MAX_CONN = 20000; - const int MAX_MAX_CONN = 20000; - - int getMaxConnections() { -#ifdef _WIN32 - return DEFAULT_MAX_CONN; -#else - struct rlimit limit; - assert( getrlimit(RLIMIT_NOFILE,&limit) == 0 ); - - int max = (int)(limit.rlim_cur * .8); - - log(1) << "fd limit" - << " hard:" << limit.rlim_max - << " soft:" << limit.rlim_cur - << " max conn: " << max - << endl; - - if ( max > MAX_MAX_CONN ) - max = MAX_MAX_CONN; - - return max; -#endif - } - - void checkTicketNumbers() { - int want = getMaxConnections(); - int current = connTicketHolder.outof(); - if ( current != DEFAULT_MAX_CONN ) { - if ( current < want ) { - // they want fewer than they can handle - // which is fine - log(1) << " only allowing " << current << " connections" << endl; - return; - } - if ( current > want ) { - log() << " --maxConns too high, can only handle " << want << endl; - } - } - connTicketHolder.resize( want ); - } - - TicketHolder connTicketHolder(DEFAULT_MAX_CONN); - -} // namespace mongo |
