diff options
Diffstat (limited to 'util/message.h')
-rw-r--r-- | util/message.h | 207 |
1 files changed, 207 insertions, 0 deletions
diff --git a/util/message.h b/util/message.h new file mode 100644 index 0000000..8d6a46e --- /dev/null +++ b/util/message.h @@ -0,0 +1,207 @@ +// message.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../util/sock.h" + +namespace mongo { + + class Message; + class MessagingPort; + class PiggyBackData; + typedef WrappingInt MSGID; + + class Listener { + public: + Listener(const string &_ip, int p) : ip(_ip), port(p) { } + virtual ~Listener() {} + bool init(); // set up socket + int socket() const { return sock; } + void listen(); // never returns (start a thread) + + /* spawn a thread, etc., then return */ + virtual void accepted(MessagingPort *mp) = 0; + private: + string ip; + int port; + int sock; + }; + + class AbstractMessagingPort { + public: + virtual ~AbstractMessagingPort() { } + virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available + virtual void reply(Message& received, Message& response) = 0; + + virtual unsigned remotePort() = 0 ; + }; + + class MessagingPort : public AbstractMessagingPort { + public: + MessagingPort(int sock, SockAddr& farEnd); + MessagingPort(); + virtual ~MessagingPort(); + + void shutdown(); + + bool connect(SockAddr& farEnd); + + /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. + also, the Message data will go out of scope on the subsequent recv call. + */ + bool recv(Message& m); + void reply(Message& received, Message& response, MSGID responseTo); + void reply(Message& received, Message& response); + bool call(Message& toSend, Message& response); + void say(Message& toSend, int responseTo = -1); + + void piggyBack( Message& toSend , int responseTo = -1 ); + + virtual unsigned remotePort(); + private: + int sock; + PiggyBackData * piggyBackData; + public: + SockAddr farEnd; + + friend class PiggyBackData; + }; + + //#pragma pack() +#pragma pack(1) + + enum Operations { + opReply = 1, /* reply. responseTo is set. */ + dbMsg = 1000, /* generic msg command followed by a string */ + dbUpdate = 2001, /* update object */ + dbInsert = 2002, + //dbGetByOID = 2003, + dbQuery = 2004, + dbGetMore = 2005, + dbDelete = 2006, + dbKillCursors = 2007 + }; + + bool doesOpGetAResponse( int op ); + + struct MsgData { + int len; /* len of the msg, including this field */ + MSGID id; /* request/reply id's match... */ + MSGID responseTo; /* id of the message we are responding to */ + int _operation; + int operation() const { + return _operation; + } + void setOperation(int o) { + _operation = o; + } + char _data[4]; + + int& dataAsInt() { + return *((int *) _data); + } + + bool valid(){ + if ( len <= 0 || len > ( 1024 * 1024 * 10 ) ) + return false; + if ( _operation < 0 || _operation > 100000 ) + return false; + return true; + } + + int dataLen(); // len without header + }; + const int MsgDataHeaderSize = sizeof(MsgData) - 4; + inline int MsgData::dataLen() { + return len - MsgDataHeaderSize; + } + +#pragma pack() + + class Message { + public: + Message() { + data = 0; + freeIt = false; + } + Message( void * _data , bool _freeIt ) { + data = (MsgData*)_data; + freeIt = _freeIt; + }; + ~Message() { + reset(); + } + + SockAddr from; + MsgData *data; + + Message& operator=(Message& r) { + assert( data == 0 ); + data = r.data; + assert( r.freeIt ); + r.freeIt = false; + r.data = 0; + freeIt = true; + return *this; + } + + void reset() { + if ( freeIt && data ) + free(data); + data = 0; + freeIt = false; + } + + void setData(MsgData *d, bool _freeIt) { + assert( data == 0 ); + freeIt = _freeIt; + data = d; + } + void setData(int operation, const char *msgtxt) { + setData(operation, msgtxt, strlen(msgtxt)+1); + } + void setData(int operation, const char *msgdata, int len) { + assert(data == 0); + int dataLen = len + sizeof(MsgData) - 4; + MsgData *d = (MsgData *) malloc(dataLen); + memcpy(d->_data, msgdata, len); + d->len = fixEndian(dataLen); + d->setOperation(operation); + freeIt= true; + data = d; + } + + bool doIFreeIt() { + return freeIt; + } + + private: + bool freeIt; + }; + + class SocketException : public DBException { + public: + virtual const char* what() const throw() { return "socket exception"; } + virtual int getCode(){ return 9001; } + }; + + MSGID nextMessageId(); + + void setClientId( int id ); + int getClientId(); +} // namespace mongo |