summaryrefslogtreecommitdiff
path: root/src/pmlogreduce/scan.c
blob: e1bece775090230d7145987c7cd857bd698ed27a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#include "pmlogreduce.h"

static struct timeval	last_tv = { 0, 0 };
static int		ictx_b = -1;

extern struct timeval	winstart_tval;

/*
 * This is the heart of the data reduction algorithm.  The term
 * metric-instance is used here to reflect the fact that this computation
 * has to be performed for every instance of every metric.
 *
 * 1. need to look at every input archive record going forward from the
 *    current point up to the last one <= end (time) ... so no interp
 *    mode here
 *
 * 2. for counter metric-instances, look for and count "wraps"
 *
 * 3. for instantenous or discrete metric-instances with a numeric type,
 *    compute the arithmetic average of the observations over the
 *    interval
 *
 * 4. for _all_ metric-instances if there are no observations in the
 *    interval, then we'd like to supress this metric-instance from the
 *    output archive
 *
 * 5. all of the above has to be done in a way that makes sense in the
 *    presence of mark records
 */

void
doscan(struct timeval *end)
{
    pmResult		*rp;
    value_t		*vp;
    int			sts;
    int			i;
    int			ir;
    int			nr;

    if (ictx_b == -1) {
	/*
	 * first time, create the record at a time mode context for the
	 * input archive
	 */
	if ((ictx_b = pmNewContext(PM_CONTEXT_ARCHIVE, iname)) < 0) {
	    fprintf(stderr, "%s: Error: cannot open archive \"%s\" (ctx_b): %s\n",
		    pmProgname, iname, pmErrStr(ictx_b));
	    exit(1);
	}

	if ((sts = pmSetMode(PM_MODE_FORW, NULL, 0)) < 0) {
	    fprintf(stderr,
		"%s: Error: pmSetMode (ictx_b) failed: %s\n", pmProgname, pmErrStr(sts));
	    exit(1);
	}
    }

    if ((sts = pmUseContext(ictx_b)) < 0) {
	fprintf(stderr, "%s: doscan: Error: cannot use context: %s\n",
		pmProgname, pmErrStr(sts));
	exit(1);
    }

    for (i = 0; i < numpmid; i++) {
	for (vp = metriclist[i].first; vp != NULL; vp = vp->next) {
	    vp->nobs = vp->nwrap = 0;
	    vp->control &= ~V_SEEN;
	}
    }

    for (nr = 0; ; nr++) {

	if ((sts = pmFetchArchive(&rp)) < 0) {
	    if (sts == PM_ERR_EOL)
		break;
	    fprintf(stderr,
		"%s: doscan: Error: pmFetch failed: %s\n", pmProgname, pmErrStr(sts));
	    exit(1);
	}
#if PCP_DEBUG
	if (pmDebug & DBG_TRACE_APPL2) {
	    if (nr == 0) {
		fprintf(stderr, "scan starts at ");
		__pmPrintStamp(stderr, &rp->timestamp);
		fprintf(stderr, "\n");
	    }
	}
#endif

	if (rp->numpmid == 0) {
	    /*
	     * Mark record ... copy into the output file as we cannot
	     * pretend there is data between the previous data record
	     * and the next data record
	     * Logic copied from pmlogextract.
	     */
	    struct {
		__pmPDU		len;
		__pmPDU		type;
		__pmPDU		from;
		__pmTimeval	timestamp;
		int		numpmid;	/* zero PMIDs to follow */
		__pmPDU		trailer;
	    } markrec;
	    /*
	     * add space for, but don't bump length for, trailer so
	     * __pmLogPutResult2() has space for trailer in the buffer
	     */
	    markrec.len = sizeof(markrec) - sizeof(__pmPDU);
	    markrec.type = markrec.from = 0;
	    markrec.timestamp.tv_sec = htonl(rp->timestamp.tv_sec);
	    markrec.timestamp.tv_usec = htonl(rp->timestamp.tv_usec);
	    markrec.numpmid = 0;
	    if ((sts = __pmLogPutResult2(&logctl, (__pmPDU *)&markrec)) < 0) {
		fprintf(stderr, "%s: Error: __pmLogPutResult2: mark record write: %s\n",
			pmProgname, pmErrStr(sts));
		exit(1);
	    }
	    /*
	     * continue on to check the interval range ... numpmid == 0
	     * makes the rest of the loop safe
	     */
	}

	if (rp->timestamp.tv_sec > end->tv_sec ||
	    (rp->timestamp.tv_sec == end->tv_sec &&
	     rp->timestamp.tv_usec > end->tv_usec)) {
	    /*
	     * past the end of the interval, remember timestamp so we
	     * can resume here next time
	     */
	    last_tv = rp->timestamp;	/* struct assignment */
	    pmFreeResult(rp);
	    break;
	}

	for (ir = 0; ir < rp->numpmid; ir++) {
	    pmValueSet		*vsp;
	    int			j;
	    metric_t		*mp;

	    vsp = rp->vset[ir];
	    if (vsp->numval <= 0)
		continue;

	    for (i = 0; i < numpmid; i++) {
		if (vsp->pmid == pmidlist[i])
		    break;
	    }
	    if (i == numpmid) {
		fprintf(stderr,
		    "%s: scan: Arrgh, cannot find pid %s in pidlist[]\n",
			pmProgname, pmIDStr(vsp->pmid));
		exit(1);
	    }
	    mp = &metriclist[i];
	    if (mp->mode == MODE_SKIP)
		continue;

	    for (j = 0; j < vsp->numval; j++) {
		value_t		*lvp = NULL;
		for (vp = mp->first; vp != NULL; vp = vp->next) {
		    if (vp->inst == vsp->vlist[j].inst)
			break;
		    lvp = vp;
		}
		if (vp == NULL) {
		    vp = (value_t *)malloc(sizeof(value_t));
		    if (vp == NULL) {
			fprintf(stderr,
			    "%s: rewrite: Arrgh, cannot malloc value_t\n", pmProgname);
			exit(1);
		    }
		    if (lvp == NULL)
			mp->first = vp;
		    else
			lvp->next = vp;
		    vp->inst = vsp->vlist[j].inst;
		    vp->nobs = vp->nwrap = 0;
		    vp->control = V_INIT;
		    vp->next = NULL;
#if PCP_DEBUG

		    if (pmDebug & DBG_TRACE_APPL1) {
			fprintf(stderr,
			    "add value_t for %s (%s) inst %d\n",
			    namelist[i], pmIDStr(pmidlist[i]), vsp->vlist[j].inst);
		    }
#endif
		}
		/* TODO ... hard part goes here 8^) */
		if (mp->idesc.sem == PM_SEM_COUNTER) {
		    /*
		     * OK, this metric is a counter, scan each instance
		     * looking for potential wraps
		     */
		    ;
		}
#if PCP_DEBUG
		if (pmDebug & DBG_TRACE_APPL1) {
		    __pmPrintStamp(stderr, &rp->timestamp);
		    fprintf(stderr, ": seen %s (%s) inst %d\n",
			namelist[i], pmIDStr(pmidlist[i]),
			vsp->vlist[j].inst);
		}
#endif
		vp->control |= V_SEEN;
	    }
	}

	pmFreeResult(rp);
    }
#if PCP_DEBUG
    if (pmDebug & DBG_TRACE_APPL2) {
	fprintf(stderr, "scan ends at ");
	__pmPrintStamp(stderr, &last_tv);
	if (sts == PM_ERR_EOL)
	    fprintf(stderr, " [EOL]");
	fprintf(stderr, " (%d records)\n", nr);
    }
#endif

    if ((sts = pmSetMode(PM_MODE_FORW, &last_tv, 0)) < 0) {
	fprintf(stderr,
	    "%s: doscan: Error: pmSetMode (ictx_b) time=", pmProgname);
	__pmPrintStamp(stderr, &last_tv);
	fprintf(stderr,
	    " failed: %s\n", pmErrStr(sts));
	exit(1);
    }
}