summaryrefslogtreecommitdiff
path: root/s/cursors.cpp
blob: 23b8eaf695a24fb2261b6f5c505f423e34add662 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// cursors.cpp

#include "stdafx.h"
#include "cursors.h"
#include "../client/connpool.h"
#include "../db/queryutil.h"

namespace mongo {
    
    // --------  ShardedCursor -----------

    ShardedClientCursor::ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ){
        assert( cursor );
        _cursor = cursor;
        
        _skip = q.ntoskip;
        _ntoreturn = q.ntoreturn;
        
        _totalSent = 0;
        _done = false;

        do {
            // TODO: only create _id when needed
            _id = security.getNonce();
        } while ( _id == 0 );

    }

    ShardedClientCursor::~ShardedClientCursor(){
        assert( _cursor );
        delete _cursor;
        _cursor = 0;
    }

    bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ){
        uassert( 10191 ,  "cursor already done" , ! _done );
                
        int maxSize = 1024 * 1024;
        if ( _totalSent > 0 )
            maxSize *= 3;
        
        BufBuilder b(32768);
        
        int num = 0;
        bool sendMore = true;

        while ( _cursor->more() ){
            BSONObj o = _cursor->next();

            b.append( (void*)o.objdata() , o.objsize() );
            num++;
            
            if ( b.len() > maxSize ){
                break;
            }

            if ( num == ntoreturn ){
                // soft limit aka batch size
                break;
            }

            if ( ntoreturn != 0 && ( -1 * num + _totalSent ) == ntoreturn ){
                // hard limit - total to send
                sendMore = false;
                break;
            }
        }

        bool hasMore = sendMore && _cursor->more();
        log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << _id << " totalSent: " << _totalSent << endl;
        
        replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? _id : 0 );
        _totalSent += num;
        _done = ! hasMore;
        
        return hasMore;
    }
    

    CursorCache::CursorCache(){
    }

    CursorCache::~CursorCache(){
        // TODO: delete old cursors?
    }

    ShardedClientCursor* CursorCache::get( long long id ){
        map<long long,ShardedClientCursor*>::iterator i = _cursors.find( id );
        if ( i == _cursors.end() ){
            OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl;
            return 0;
        }
        return i->second;
    }
    
    void CursorCache::store( ShardedClientCursor * cursor ){
        _cursors[cursor->getId()] = cursor;
    }
    void CursorCache::remove( long long id ){
        _cursors.erase( id );
    }

    CursorCache cursorCache;
}