summaryrefslogtreecommitdiff
path: root/db/oplogreader.h
blob: 5c2881b51a44fe60223e0222d6a8e3f42aa3f01a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/** @file oplogreader.h */

#pragma once

#include "../client/dbclient.h"
#include "../client/constants.h"
#include "dbhelpers.h"

namespace mongo {

    /* started abstracting out the querying of the primary/master's oplog 
       still fairly awkward but a start.
    */
    class OplogReader {
        auto_ptr<DBClientConnection> _conn;
        auto_ptr<DBClientCursor> cursor;
    public:

        OplogReader() { 
            DEV log() << "TEMP *** OplogReader()" << endl;
        }
        ~OplogReader() { 
            DEV log() << "TEMP *** ~OplogReader()" << endl;
        }

        void resetCursor() {
            DEV log() << "TEMP *** OplogReader::resetCursor" << endl;
            cursor.reset();
        }
        void resetConnection() {
            DEV log() << "TEMP *** OplogReader::resetConnection" << endl;
            cursor.reset();
            _conn.reset();
        }
        DBClientConnection* conn() { return _conn.get(); }
        BSONObj findOne(const char *ns, const Query& q) { 
            return conn()->findOne(ns, q);
        }

        BSONObj getLastOp(const char *ns) { 
            return findOne(ns, Query().sort(reverseNaturalObj));
        }

        /* ok to call if already connected */
        bool connect(string hostname);

        void tailCheck() {
            if( cursor.get() && cursor->isDead() ) { 
                log() << "repl: old cursor isDead, will initiate a new one" << endl;
                resetCursor();
            }
        }

        bool haveCursor() { return cursor.get() != 0; }

        void query(const char *ns, const BSONObj& query) { 
            assert( !haveCursor() );
            cursor = _conn->query(ns, query, 0, 0, 0, QueryOption_SlaveOk);
        }

        void tailingQuery(const char *ns, const BSONObj& query) { 
            assert( !haveCursor() );
            log(2) << "repl: " << ns << ".find(" << query.toString() << ')' << endl;
            cursor = _conn->query( ns, query, 0, 0, 0, 
                                  QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay |
                                  /* TODO: slaveok maybe shouldn't use? */
                                  QueryOption_AwaitData
                                  );
        }

        void tailingQueryGTE(const char *ns, OpTime t) {
            BSONObjBuilder q;
            q.appendDate("$gte", t.asDate());
            BSONObjBuilder query;
            query.append("ts", q.done());
            tailingQuery(ns, query.done());
        }

        bool more() { 
            assert( cursor.get() );
            return cursor->more();
        }
        bool moreInCurrentBatch() { 
            assert( cursor.get() );
            return cursor->moreInCurrentBatch();
        }

        /* old mongod's can't do the await flag... */
        bool awaitCapable() { 
            return cursor->hasResultFlag(ResultFlag_AwaitCapable);
        }

        void peek(vector<BSONObj>& v, int n) { 
            if( cursor.get() )
                cursor->peek(v,n);
        }

        BSONObj nextSafe() { return cursor->nextSafe(); }

        BSONObj next() { 
            return cursor->next();
        }

        void putBack(BSONObj op) { 
            cursor->putBack(op);
        }
    };
    
}