summaryrefslogtreecommitdiff
path: root/db/taskqueue.h
blob: c6a5667676d5c5499318235c45b6890652c6242d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// @file deferredinvoker.h

/**
 *    Copyright (C) 2008 10gen Inc.
 *
 *    This program is free software: you can redistribute it and/or  modify
 *    it under the terms of the GNU Affero General Public License, version 3,
 *    as published by the Free Software Foundation.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    GNU Affero General Public License for more details.
 *
 *    You should have received a copy of the GNU Affero General Public License
 *    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once

#include "mongomutex.h"

namespace mongo {

    /** defer work items by queueing them for invocation by another thread.  presumption is that
        consumer thread is outside of locks more than the source thread.  Additional presumption
        is that several objects or micro-tasks will be queued and that having a single thread
        processing them in batch is hepful as they (in the first use case) use a common data
        structure that can then be in local cpu classes.

        this class is in db/ as it is dbMutex (mongomutex) specific (so far).

        using a functor instead of go() might be more elegant too, once again, would like to test any
        performance differential.  also worry that operator() hides things?

        MT - copyable "micro task" object we can queue
             must have a static method void MT::go(const MT&)

        see DefInvoke in dbtests/ for an example.
    */
    template< class MT >
    class TaskQueue {
    public:
        TaskQueue() : _which(0), _invokeMutex("deferredinvoker") { }

        void defer(MT mt) {
            // only one writer allowed.  however the invoke processing below can occur concurrently with
            // writes (for the most part)
            DEV dbMutex.assertWriteLocked();

            _queues[_which].push_back(mt);
        }

        /** call to process deferrals.

            concurrency: handled herein.  multiple threads could call invoke(), but their efforts will be
                         serialized.  the common case is that there is a single processor calling invoke().

            normally, you call this outside of any lock.  but if you want to fully drain the queue,
            call from within a read lock.  for example:
            {
              // drain with minimal time in lock
              d.invoke();
              readlock lk;
              d.invoke();
              ...
            }
            you can also call invoke periodically to do some work and then pick up later on more.
        */
        void invoke() {
            mutex::scoped_lock lk2(_invokeMutex);
            int toDrain = 0;
            {
                // flip queueing to the other queue (we are double buffered)
                readlocktry lk("", 5);
                if( !lk.got() )
                    return;
                toDrain = _which;
                _which = _which ^ 1;
                wassert( _queues[_which].empty() ); // we are in dbMutex, so it should be/stay empty til we exit dbMutex
            }

            _drain( _queues[toDrain] );
            assert( _queues[toDrain].empty() );
        }

    private:
        int _which; // 0 or 1
        typedef vector< MT > Queue;
        Queue _queues[2];

        // lock order when multiple locks: dbMutex, _invokeMutex
        mongo::mutex _invokeMutex;

        void _drain(Queue& queue) {
            unsigned oldCap = queue.capacity();
            for( typename Queue::iterator i = queue.begin(); i != queue.end(); i++ ) {
                const MT& v = *i;
                MT::go(v);
            }
            queue.clear();
            DEV assert( queue.capacity() == oldCap ); // just checking that clear() doesn't deallocate, we don't want that
        }
    };

}