diff options
author | Igor Pashev <pashev.igor@gmail.com> | 2014-10-26 12:33:50 +0400 |
---|---|---|
committer | Igor Pashev <pashev.igor@gmail.com> | 2014-10-26 12:33:50 +0400 |
commit | 47e6e7c84f008a53061e661f31ae96629bc694ef (patch) | |
tree | 648a07f3b5b9d67ce19b0fd72e8caa1175c98f1a /src/pmlogreduce/scan.c | |
download | pcp-debian/3.9.10.tar.gz |
Debian 3.9.10debian/3.9.10debian
Diffstat (limited to 'src/pmlogreduce/scan.c')
-rw-r--r-- | src/pmlogreduce/scan.c | 232 |
1 files changed, 232 insertions, 0 deletions
diff --git a/src/pmlogreduce/scan.c b/src/pmlogreduce/scan.c new file mode 100644 index 0000000..e1bece7 --- /dev/null +++ b/src/pmlogreduce/scan.c @@ -0,0 +1,232 @@ +#include "pmlogreduce.h" + +static struct timeval last_tv = { 0, 0 }; +static int ictx_b = -1; + +extern struct timeval winstart_tval; + +/* + * This is the heart of the data reduction algorithm. The term + * metric-instance is used here to reflect the fact that this computation + * has to be performed for every instance of every metric. + * + * 1. need to look at every input archive record going forward from the + * current point up to the last one <= end (time) ... so no interp + * mode here + * + * 2. for counter metric-instances, look for and count "wraps" + * + * 3. for instantenous or discrete metric-instances with a numeric type, + * compute the arithmetic average of the observations over the + * interval + * + * 4. for _all_ metric-instances if there are no observations in the + * interval, then we'd like to supress this metric-instance from the + * output archive + * + * 5. all of the above has to be done in a way that makes sense in the + * presence of mark records + */ + +void +doscan(struct timeval *end) +{ + pmResult *rp; + value_t *vp; + int sts; + int i; + int ir; + int nr; + + if (ictx_b == -1) { + /* + * first time, create the record at a time mode context for the + * input archive + */ + if ((ictx_b = pmNewContext(PM_CONTEXT_ARCHIVE, iname)) < 0) { + fprintf(stderr, "%s: Error: cannot open archive \"%s\" (ctx_b): %s\n", + pmProgname, iname, pmErrStr(ictx_b)); + exit(1); + } + + if ((sts = pmSetMode(PM_MODE_FORW, NULL, 0)) < 0) { + fprintf(stderr, + "%s: Error: pmSetMode (ictx_b) failed: %s\n", pmProgname, pmErrStr(sts)); + exit(1); + } + } + + if ((sts = pmUseContext(ictx_b)) < 0) { + fprintf(stderr, "%s: doscan: Error: cannot use context: %s\n", + pmProgname, pmErrStr(sts)); + exit(1); + } + + for (i = 0; i < numpmid; i++) { + for (vp = metriclist[i].first; vp != NULL; vp = vp->next) { + vp->nobs = vp->nwrap = 0; + vp->control &= ~V_SEEN; + } + } + + for (nr = 0; ; nr++) { + + if ((sts = pmFetchArchive(&rp)) < 0) { + if (sts == PM_ERR_EOL) + break; + fprintf(stderr, + "%s: doscan: Error: pmFetch failed: %s\n", pmProgname, pmErrStr(sts)); + exit(1); + } +#if PCP_DEBUG + if (pmDebug & DBG_TRACE_APPL2) { + if (nr == 0) { + fprintf(stderr, "scan starts at "); + __pmPrintStamp(stderr, &rp->timestamp); + fprintf(stderr, "\n"); + } + } +#endif + + if (rp->numpmid == 0) { + /* + * Mark record ... copy into the output file as we cannot + * pretend there is data between the previous data record + * and the next data record + * Logic copied from pmlogextract. + */ + struct { + __pmPDU len; + __pmPDU type; + __pmPDU from; + __pmTimeval timestamp; + int numpmid; /* zero PMIDs to follow */ + __pmPDU trailer; + } markrec; + /* + * add space for, but don't bump length for, trailer so + * __pmLogPutResult2() has space for trailer in the buffer + */ + markrec.len = sizeof(markrec) - sizeof(__pmPDU); + markrec.type = markrec.from = 0; + markrec.timestamp.tv_sec = htonl(rp->timestamp.tv_sec); + markrec.timestamp.tv_usec = htonl(rp->timestamp.tv_usec); + markrec.numpmid = 0; + if ((sts = __pmLogPutResult2(&logctl, (__pmPDU *)&markrec)) < 0) { + fprintf(stderr, "%s: Error: __pmLogPutResult2: mark record write: %s\n", + pmProgname, pmErrStr(sts)); + exit(1); + } + /* + * continue on to check the interval range ... numpmid == 0 + * makes the rest of the loop safe + */ + } + + if (rp->timestamp.tv_sec > end->tv_sec || + (rp->timestamp.tv_sec == end->tv_sec && + rp->timestamp.tv_usec > end->tv_usec)) { + /* + * past the end of the interval, remember timestamp so we + * can resume here next time + */ + last_tv = rp->timestamp; /* struct assignment */ + pmFreeResult(rp); + break; + } + + for (ir = 0; ir < rp->numpmid; ir++) { + pmValueSet *vsp; + int j; + metric_t *mp; + + vsp = rp->vset[ir]; + if (vsp->numval <= 0) + continue; + + for (i = 0; i < numpmid; i++) { + if (vsp->pmid == pmidlist[i]) + break; + } + if (i == numpmid) { + fprintf(stderr, + "%s: scan: Arrgh, cannot find pid %s in pidlist[]\n", + pmProgname, pmIDStr(vsp->pmid)); + exit(1); + } + mp = &metriclist[i]; + if (mp->mode == MODE_SKIP) + continue; + + for (j = 0; j < vsp->numval; j++) { + value_t *lvp = NULL; + for (vp = mp->first; vp != NULL; vp = vp->next) { + if (vp->inst == vsp->vlist[j].inst) + break; + lvp = vp; + } + if (vp == NULL) { + vp = (value_t *)malloc(sizeof(value_t)); + if (vp == NULL) { + fprintf(stderr, + "%s: rewrite: Arrgh, cannot malloc value_t\n", pmProgname); + exit(1); + } + if (lvp == NULL) + mp->first = vp; + else + lvp->next = vp; + vp->inst = vsp->vlist[j].inst; + vp->nobs = vp->nwrap = 0; + vp->control = V_INIT; + vp->next = NULL; +#if PCP_DEBUG + + if (pmDebug & DBG_TRACE_APPL1) { + fprintf(stderr, + "add value_t for %s (%s) inst %d\n", + namelist[i], pmIDStr(pmidlist[i]), vsp->vlist[j].inst); + } +#endif + } + /* TODO ... hard part goes here 8^) */ + if (mp->idesc.sem == PM_SEM_COUNTER) { + /* + * OK, this metric is a counter, scan each instance + * looking for potential wraps + */ + ; + } +#if PCP_DEBUG + if (pmDebug & DBG_TRACE_APPL1) { + __pmPrintStamp(stderr, &rp->timestamp); + fprintf(stderr, ": seen %s (%s) inst %d\n", + namelist[i], pmIDStr(pmidlist[i]), + vsp->vlist[j].inst); + } +#endif + vp->control |= V_SEEN; + } + } + + pmFreeResult(rp); + } +#if PCP_DEBUG + if (pmDebug & DBG_TRACE_APPL2) { + fprintf(stderr, "scan ends at "); + __pmPrintStamp(stderr, &last_tv); + if (sts == PM_ERR_EOL) + fprintf(stderr, " [EOL]"); + fprintf(stderr, " (%d records)\n", nr); + } +#endif + + if ((sts = pmSetMode(PM_MODE_FORW, &last_tv, 0)) < 0) { + fprintf(stderr, + "%s: doscan: Error: pmSetMode (ictx_b) time=", pmProgname); + __pmPrintStamp(stderr, &last_tv); + fprintf(stderr, + " failed: %s\n", pmErrStr(sts)); + exit(1); + } +} |