diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
commit | 7645618fd3914cb8a20561625913c20d49504a49 (patch) | |
tree | 8370f846f58f6d71165b7a0e2eda04648584ec76 /db/oplogreader.h | |
parent | 68c73c3c7608b4c87f07440dc3232801720b1168 (diff) | |
download | mongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz |
Imported Upstream version 1.6.0
Diffstat (limited to 'db/oplogreader.h')
-rw-r--r-- | db/oplogreader.h | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/db/oplogreader.h b/db/oplogreader.h new file mode 100644 index 0000000..5c2881b --- /dev/null +++ b/db/oplogreader.h @@ -0,0 +1,109 @@ +/** @file oplogreader.h */ + +#pragma once + +#include "../client/dbclient.h" +#include "../client/constants.h" +#include "dbhelpers.h" + +namespace mongo { + + /* started abstracting out the querying of the primary/master's oplog + still fairly awkward but a start. + */ + class OplogReader { + auto_ptr<DBClientConnection> _conn; + auto_ptr<DBClientCursor> cursor; + public: + + OplogReader() { + DEV log() << "TEMP *** OplogReader()" << endl; + } + ~OplogReader() { + DEV log() << "TEMP *** ~OplogReader()" << endl; + } + + void resetCursor() { + DEV log() << "TEMP *** OplogReader::resetCursor" << endl; + cursor.reset(); + } + void resetConnection() { + DEV log() << "TEMP *** OplogReader::resetConnection" << endl; + cursor.reset(); + _conn.reset(); + } + DBClientConnection* conn() { return _conn.get(); } + BSONObj findOne(const char *ns, const Query& q) { + return conn()->findOne(ns, q); + } + + BSONObj getLastOp(const char *ns) { + return findOne(ns, Query().sort(reverseNaturalObj)); + } + + /* ok to call if already connected */ + bool connect(string hostname); + + void tailCheck() { + if( cursor.get() && cursor->isDead() ) { + log() << "repl: old cursor isDead, will initiate a new one" << endl; + resetCursor(); + } + } + + bool haveCursor() { return cursor.get() != 0; } + + void query(const char *ns, const BSONObj& query) { + assert( !haveCursor() ); + cursor = _conn->query(ns, query, 0, 0, 0, QueryOption_SlaveOk); + } + + void tailingQuery(const char *ns, const BSONObj& query) { + assert( !haveCursor() ); + log(2) << "repl: " << ns << ".find(" << query.toString() << ')' << endl; + cursor = _conn->query( ns, query, 0, 0, 0, + QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | + /* TODO: slaveok maybe shouldn't use? */ + QueryOption_AwaitData + ); + } + + void tailingQueryGTE(const char *ns, OpTime t) { + BSONObjBuilder q; + q.appendDate("$gte", t.asDate()); + BSONObjBuilder query; + query.append("ts", q.done()); + tailingQuery(ns, query.done()); + } + + bool more() { + assert( cursor.get() ); + return cursor->more(); + } + bool moreInCurrentBatch() { + assert( cursor.get() ); + return cursor->moreInCurrentBatch(); + } + + /* old mongod's can't do the await flag... */ + bool awaitCapable() { + return cursor->hasResultFlag(ResultFlag_AwaitCapable); + } + + void peek(vector<BSONObj>& v, int n) { + if( cursor.get() ) + cursor->peek(v,n); + } + + BSONObj nextSafe() { return cursor->nextSafe(); } + + BSONObj next() { + return cursor->next(); + } + + void putBack(BSONObj op) { + cursor->putBack(op); + } + }; + +} |