summaryrefslogtreecommitdiff
path: root/runtime/batch.h
blob: e3fa04518a845b39f3e8f97e872e25dc31a8e054 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/* Definition of the batch_t data structure.
 * I am not sure yet if this will become a full-blown object. For now, this header just
 * includes the object definition and is not accompanied by code.
 *
 * Copyright 2009-2013 by Rainer Gerhards and Adiscon GmbH.
 *
 * This file is part of the rsyslog runtime library.
 *
 * The rsyslog runtime library is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * The rsyslog runtime library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with the rsyslog runtime library.  If not, see <http://www.gnu.org/licenses/>.
 *
 * A copy of the GPL can be found in the file "COPYING" in this distribution.
 * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
 */

#ifndef BATCH_H_INCLUDED
#define BATCH_H_INCLUDED

#include <string.h>
#include "msg.h"

/* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used
 * for action processing. So far, this seems acceptable, the status is simply ignored inside the
 * main message queue. But over time, it could potentially be useful to split the two.
 * rgerhad, 2009-05-12
 */
#define BATCH_STATE_RDY  0	/* object ready for processing */
#define BATCH_STATE_BAD  1	/* unrecoverable failure while processing, do NOT resubmit to same action */
#define BATCH_STATE_SUB  2	/* message submitted for processing, outcome yet unknown */
#define BATCH_STATE_COMM 3	/* message successfully commited */
#define BATCH_STATE_DISC 4 	/* discarded - processed OK, but do not submit to any other action */
typedef unsigned char batch_state_t;


/* an object inside a batch, including any information (state!) needed for it to "life".
 */
struct batch_obj_s {
	msg_t *pMsg;
};

/* the batch
 * This object is used to dequeue multiple user pointers which are than handed over
 * to processing. The size of elements is fixed after queue creation, but may be 
 * modified by config variables (better said: queue properties).
 * Note that a "user pointer" in rsyslog context so far always is a message 
 * object. We stick to the more generic term because queues may potentially hold
 * other types of objects, too.
 * rgerhards, 2009-05-12
 * Note that nElem is not necessarily equal to nElemDeq. This is the case when we
 * discard some elements (because of configuration) during dequeue processing. As
 * all Elements are only deleted when the batch is processed, we can not immediately
 * delete them. So we need to keep their number that we can delete them when the batch
 * is completed (else, the whole process does not work correctly).
 */
struct batch_s {
	int maxElem;		/* maximum number of elements that this batch supports */
	int nElem;		/* actual number of element in this entry */
	int nElemDeq;		/* actual number of elements dequeued (and thus to be deleted) - see comment above! */
	qDeqID	deqID;		/* ID of dequeue operation that generated this batch */
	batch_obj_t *pElem;	/* batch elements */
	batch_state_t *eltState;/* state (array!) for individual objects.
	   			   NOTE: we have moved this out of batch_obj_t because we
				         get a *much* better cache hit ratio this way. So do not
					 move it back into this structure! Note that this is really
					 a HUGE saving, even if it doesn't look so (both profiler
					 data as well as practical tests indicate that!).
				*/
};


/* get number of msgs for this batch */
static inline int
batchNumMsgs(const batch_t * const pBatch) {
	return pBatch->nElem;
}


/* set the status of the i-th batch element. Note that once the status is
 * DISC, it will never be reset. So this function can NOT be used to initialize
 * the state table. -- rgerhards, 2010-06-10
 */
static inline void
batchSetElemState(batch_t * const pBatch, const int i, const batch_state_t newState) {
	if(pBatch->eltState[i] != BATCH_STATE_DISC)
		pBatch->eltState[i] = newState;
}


/* check if an element is a valid entry. We do NOT verify if the
 * element index is valid. -- rgerhards, 2010-06-10
 */
static inline int
batchIsValidElem(const batch_t * const pBatch, const int i) {
	return(pBatch->eltState[i] != BATCH_STATE_DISC);
}


/* free members of a batch "object". Note that we can not do the usual
 * destruction as the object typically is allocated on the stack and so the
 * object itself cannot be freed! -- rgerhards, 2010-06-15
 */
static inline void
batchFree(batch_t * const pBatch) {
	free(pBatch->pElem);
	free(pBatch->eltState);
}


/* initialiaze a batch "object". The record must already exist,
 * we "just" initialize it. The max number of elements must be
 * provided. -- rgerhards, 2010-06-15
 */
static inline rsRetVal
batchInit(batch_t *const pBatch, const int maxElem) {
	DEFiRet;
	pBatch->maxElem = maxElem;
	CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
	CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t)));
finalize_it:
	RETiRet;
}


/* primarily a helper for debug purposes, get human-readble name of state */
static inline char *
batchState2String(const batch_state_t state) {
	switch(state) {
	case BATCH_STATE_RDY:
		return "BATCH_STATE_RDY";
	case BATCH_STATE_BAD:
		return "BATCH_STATE_BAD";
	case BATCH_STATE_SUB:
		return "BATCH_STATE_SUB";
	case BATCH_STATE_COMM:
		return "BATCH_STATE_COMM";
	case BATCH_STATE_DISC:
		return "BATCH_STATE_DISC";
	}
	return "ERROR, batch state not known!";
}
#endif /* #ifndef BATCH_H_INCLUDED */