summaryrefslogtreecommitdiff
path: root/src/pmlogreduce
diff options
context:
space:
mode:
Diffstat (limited to 'src/pmlogreduce')
-rw-r--r--src/pmlogreduce/GNUmakefile43
-rw-r--r--src/pmlogreduce/dometric.c155
-rw-r--r--src/pmlogreduce/indom.c103
-rw-r--r--src/pmlogreduce/logio.c260
-rw-r--r--src/pmlogreduce/pmlogreduce.c490
-rw-r--r--src/pmlogreduce/pmlogreduce.h85
-rw-r--r--src/pmlogreduce/rewrite.c178
-rw-r--r--src/pmlogreduce/scan.c232
8 files changed, 1546 insertions, 0 deletions
diff --git a/src/pmlogreduce/GNUmakefile b/src/pmlogreduce/GNUmakefile
new file mode 100644
index 0000000..4fd88d6
--- /dev/null
+++ b/src/pmlogreduce/GNUmakefile
@@ -0,0 +1,43 @@
+#
+# Copyright (c) 2004 Silicon Graphics, Inc. All Rights Reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+
+TOPDIR = ../..
+include $(TOPDIR)/src/include/builddefs
+
+CFILES = pmlogreduce.c logio.c dometric.c rewrite.c indom.c scan.c
+HFILES = pmlogreduce.h
+
+CMDTARGET = pmlogreduce$(EXECSUFFIX)
+LLDLIBS = $(PCPLIB)
+
+default: $(CMDTARGET)
+
+include $(BUILDRULES)
+
+pmlogreduce : $(OBJECTS)
+
+install: $(CMDTARGET)
+ $(INSTALL) -m 755 $(CMDTARGET) $(PCP_BINADM_DIR)/$(CMDTARGET)
+
+pmlogreduce.o: pmlogreduce.h
+logio.o: pmlogreduce.h
+dometric.o: pmlogreduce.h
+rewrite.o: pmlogreduce.h
+indom.o: pmlogreduce.h
+wrap.o: pmlogreduce.h
+scan.o: pmlogreduce.h
+
+default_pcp : default
+
+install_pcp : install
diff --git a/src/pmlogreduce/dometric.c b/src/pmlogreduce/dometric.c
new file mode 100644
index 0000000..6ba0fdc
--- /dev/null
+++ b/src/pmlogreduce/dometric.c
@@ -0,0 +1,155 @@
+#include "pmlogreduce.h"
+
+void
+dometric(const char *name)
+{
+ int sts;
+ metric_t *mp;
+
+ if ((namelist = (char **)realloc(namelist, (numpmid+1)*sizeof(namelist[0]))) == NULL) {
+ fprintf(stderr,
+ "%s: dometric: Error: cannot realloc space for %d names\n",
+ pmProgname, numpmid+1);
+ exit(1);
+ }
+ namelist[numpmid] = strdup(name);
+ if ((pmidlist = (pmID *)realloc(pmidlist, (numpmid+1)*sizeof(pmidlist[0]))) == NULL) {
+ fprintf(stderr,
+ "%s: dometric: Error: cannot realloc space for %d pmIDs\n",
+ pmProgname, numpmid+1);
+ exit(1);
+ }
+ if ((sts = pmLookupName(1, (char **)&name, &pmidlist[numpmid])) < 0) {
+ fprintf(stderr,
+ "%s: dometric: Error: cannot lookup pmID for metric \"%s\": %s\n",
+ pmProgname, name, pmErrStr(sts));
+ exit(1);
+ }
+ if ((metriclist = (metric_t *)realloc(metriclist, (numpmid+1)*sizeof(metriclist[0]))) == NULL) {
+ fprintf(stderr,
+ "%s: dometric: Error: cannot realloc space for %d metric_t's\n",
+ pmProgname, numpmid+1);
+ exit(1);
+ }
+ mp = &metriclist[numpmid];
+ mp->first = NULL;
+ if ((sts = pmLookupDesc(pmidlist[numpmid], &mp->idesc)) < 0) {
+ fprintf(stderr,
+ "%s: dometric: Error: cannot lookup pmDesc for metric \"%s\": %s\n",
+ pmProgname, name, pmErrStr(sts));
+ exit(1);
+ }
+ mp->odesc = mp->idesc; /* struct assignment */
+ mp->mode = MODE_NORMAL;
+ mp->idp = NULL;
+
+ /*
+ * some metrics cannot sensibly be processed ... skip these ones
+ */
+ if (mp->idesc.type == PM_TYPE_AGGREGATE ||
+ mp->idesc.type == PM_TYPE_AGGREGATE_STATIC ||
+ mp->idesc.type == PM_TYPE_EVENT) {
+ fprintf(stderr,
+ "%s: %s: Warning: skipping %s metric\n",
+ pmProgname, name, pmTypeStr(mp->idesc.type));
+ mp->mode = MODE_SKIP;
+ goto done;
+ }
+
+ /*
+ * input -> output descriptor mapping ... has to be the same
+ * logic as we apply to the pmResults later on.
+ */
+ switch (mp->idesc.sem) {
+ case PM_SEM_COUNTER:
+ switch (mp->idesc.type) {
+ case PM_TYPE_32:
+ mp->odesc.type = PM_TYPE_64;
+ mp->mode = MODE_REWRITE;
+ break;
+ case PM_TYPE_U32:
+ mp->odesc.type = PM_TYPE_U64;
+ mp->mode = MODE_REWRITE;
+ break;
+ }
+#if 0
+ mp->odesc.sem = PM_SEM_INSTANT;
+ if (mp->idesc.units.dimTime == 0) {
+ /* rate convert */
+ mp->odesc.units.dimTime = -1;
+ mp->odesc.units.scaleTime = PM_TIME_SEC;
+ }
+ else if (mp->idesc.units.dimTime == 1) {
+ /* becomes (time) utilization */
+ mp->odesc.units.dimTime = 0;
+ mp->odesc.units.scaleTime = 0;
+ }
+ else {
+ fprintf(stderr, "Cannot rate convert \"%s\" yet,", namelist[numpmid]);
+ __pmPrintDesc(stderr, &mp->idesc);
+ exit(1);
+ }
+ break;
+#endif
+ }
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL0) {
+ fprintf(stderr, "metric: \"%s\" (%s)\n",
+ namelist[numpmid], pmIDStr(pmidlist[numpmid]));
+ fprintf(stderr, "input descriptor:\n");
+ __pmPrintDesc(stderr, &mp->idesc);
+ fprintf(stderr, "output descriptor (added to archive):\n");
+ __pmPrintDesc(stderr, &mp->odesc);
+ }
+#endif
+
+ if ((sts = __pmLogPutDesc(&logctl, &mp->odesc, 1, &namelist[numpmid])) < 0) {
+ fprintf(stderr,
+ "%s: Error: failed to add pmDesc for %s (%s): %s\n",
+ pmProgname, namelist[numpmid], pmIDStr(pmidlist[numpmid]), pmErrStr(sts));
+ exit(1);
+ }
+
+ /*
+ * instance domain initialization
+ */
+ if (mp->idesc.indom != PM_INDOM_NULL) {
+ /*
+ * has an instance domain, check to see if it has already been seen
+ */
+ int j;
+
+ for (j = 0; j <= numpmid; j++) {
+ if (metriclist[j].idp != NULL &&
+ metriclist[j].idp->indom == mp->idesc.indom) {
+ mp->idp = metriclist[j].idp;
+ break;
+ }
+ }
+ if (j > numpmid) {
+ /* first sighting, allocate a new one */
+ if ((mp->idp = (indom_t *)malloc(sizeof(indom_t))) == NULL) {
+ fprintf(stderr,
+ "%s: dometric: Error: cannot malloc indom_t for %s\n",
+ pmProgname, pmInDomStr(mp->idesc.indom));
+ exit(1);
+ }
+ mp->idp->indom = mp->idesc.indom;
+ mp->idp->numinst = 0;
+ mp->idp->inst = NULL;
+ mp->idp->name = NULL;
+ }
+ }
+
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL0) {
+ if (mp->idp != NULL)
+ fprintf(stderr, " indom %s -> (" PRINTF_P_PFX "%p)\n", pmInDomStr(mp->idp->indom), mp->idp);
+ }
+#endif
+
+done:
+
+ numpmid++;
+}
+
diff --git a/src/pmlogreduce/indom.c b/src/pmlogreduce/indom.c
new file mode 100644
index 0000000..57efc4e
--- /dev/null
+++ b/src/pmlogreduce/indom.c
@@ -0,0 +1,103 @@
+#include "pmlogreduce.h"
+
+void
+doindom(pmResult *rp)
+{
+ pmValueSet *vsp;
+ int i;
+ int j;
+ int needti = 0;
+ int need;
+ metric_t *mp = NULL;
+ int *instlist;
+ char **namelist;
+ int sts;
+
+ for (i = 0; i < rp->numpmid; i++) {
+ vsp = rp->vset[i];
+ if (vsp->numval <= 0)
+ continue;
+
+ /*
+ * pmidlist[] and rp->vset[]->pmid may not be in 1:1
+ * correspondence because we come here after rewrite() has
+ * been called ... search for matching pmid
+ */
+ for (j = 0; j < numpmid; j++) {
+ if (pmidlist[j] == vsp->pmid) {
+ mp = &metriclist[j];
+ break;
+ }
+ }
+ if (mp == NULL) {
+ fprintf(stderr,
+ "%s: doindom: Arrgh, unexpected PMID %s @ vset[%d]\n",
+ pmProgname, pmIDStr(vsp->pmid), i);
+ __pmDumpResult(stderr, rp);
+ exit(1);
+ }
+ if (mp->idp == NULL)
+ continue;
+
+ if ((sts = pmGetInDom(mp->idp->indom, &instlist, &namelist)) < 0) {
+ fprintf(stderr,
+ "%s: doindom: pmGetInDom (%s) failed: %s\n",
+ pmProgname, pmInDomStr(mp->idp->indom), pmErrStr(sts));
+ exit(1);
+ }
+
+ need = 1;
+ /*
+ * Need to output the indom if the number of instances changes
+ * or the set of instance ids are not the same from the last
+ * time.
+ */
+ if (sts == mp->idp->numinst) {
+ for (j = 0; j < mp->idp->numinst; j++) {
+ if (mp->idp->inst[j] != instlist[j])
+ break;
+ }
+ if (j == mp->idp->numinst) {
+ /*
+ * Do we need to check the namelist elts as well, e.g.
+ * using strcmp()?
+ * Not at this stage ... if the instance ids are all the
+ * same, then only a very odd (and non-compliant) PMDA
+ * would change the mapping from id to name on the fly
+ */
+ need = 0;
+ }
+ }
+
+ if (need) {
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL0) {
+ fprintf(stderr, "Add metadata: indom %s for metric %s\n", pmInDomStr(mp->idp->indom), pmIDStr(vsp->pmid));
+ }
+#endif
+ if (mp->idp->name != NULL) free(mp->idp->name);
+ if (mp->idp->inst != NULL) free(mp->idp->inst);
+ mp->idp->name = namelist;
+ mp->idp->inst = instlist;
+ mp->idp->numinst = sts;
+ if ((sts = __pmLogPutInDom(&logctl, mp->idp->indom, &current, mp->idp->numinst, mp->idp->inst, mp->idp->name)) < 0) {
+ fprintf(stderr,
+ "%s: Error: failed to add pmInDom: indom %s (for pmid %s): %s\n",
+ pmProgname, pmInDomStr(mp->idp->indom), pmIDStr(vsp->pmid), pmErrStr(sts));
+ exit(1);
+ }
+ needti = 1; /* requires a temporal index update */
+ }
+ else {
+ free(instlist);
+ free(namelist);
+ }
+
+ }
+
+ if (needti) {
+ fflush(logctl.l_mdfp);
+ __pmLogPutIndex(&logctl, &current);
+ }
+
+}
diff --git a/src/pmlogreduce/logio.c b/src/pmlogreduce/logio.c
new file mode 100644
index 0000000..09a135b
--- /dev/null
+++ b/src/pmlogreduce/logio.c
@@ -0,0 +1,260 @@
+/*
+ * utils for pmlogextract
+ *
+ * Copyright (c) 1997-2002 Silicon Graphics, Inc. All Rights Reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "pmlogreduce.h"
+
+/*
+ * raw read of next log record - largely stolen from __pmLogRead in libpcp
+ */
+int
+_pmLogGet(__pmLogCtl *lcp, int vol, __pmPDU **pb)
+{
+ int head;
+ int tail;
+ int sts;
+ long offset;
+ char *p;
+ __pmPDU *lpb;
+ FILE *f;
+
+ if (vol == PM_LOG_VOL_META)
+ f = lcp->l_mdfp;
+ else
+ f = lcp->l_mfp;
+
+ offset = ftell(f);
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG) {
+ fprintf(stderr, "_pmLogGet: fd=%d vol=%d posn=%ld ",
+ fileno(f), vol, offset);
+ }
+#endif
+
+again:
+ sts = (int)fread(&head, 1, sizeof(head), f);
+ if (sts != sizeof(head)) {
+ if (sts == 0) {
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG)
+ fprintf(stderr, "AFTER end\n");
+#endif
+ fseek(f, offset, SEEK_SET);
+ if (vol != PM_LOG_VOL_META) {
+ if (lcp->l_curvol < lcp->l_maxvol) {
+ if (__pmLogChangeVol(lcp, lcp->l_curvol+1) == 0) {
+ f = lcp->l_mfp;
+ goto again;
+ }
+ }
+ }
+ return PM_ERR_EOL;
+ }
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG)
+ fprintf(stderr, "Error: hdr fread=%d %s\n", sts, osstrerror());
+#endif
+ if (sts > 0)
+ return PM_ERR_LOGREC;
+ else
+ return -oserror();
+ }
+
+ if ((lpb = (__pmPDU *)malloc(ntohl(head))) == NULL) {
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG)
+ fprintf(stderr, "Error: _pmLogGet:(%d) %s\n",
+ (int)ntohl(head), osstrerror());
+#endif
+ fseek(f, offset, SEEK_SET);
+ return -oserror();
+ }
+
+ lpb[0] = head;
+ if ((sts = (int)fread(&lpb[1], 1, ntohl(head) - sizeof(head), f)) != ntohl(head) - sizeof(head)) {
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG)
+ fprintf(stderr, "Error: data fread=%d %s\n", sts, osstrerror());
+#endif
+ if (sts == 0) {
+ fseek(f, offset, SEEK_SET);
+ free(lpb);
+ return PM_ERR_EOL;
+ }
+ else if (sts > 0) {
+ free(lpb);
+ return PM_ERR_LOGREC;
+ }
+ else {
+ int e = -oserror();
+ free(lpb);
+ return e;
+ }
+ }
+
+
+ p = (char *)lpb;
+ memcpy(&tail, &p[ntohl(head) - sizeof(head)], sizeof(head));
+ if (head != tail) {
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG)
+ fprintf(stderr, "Error: head-tail mismatch (%d-%d)\n",
+ (int)ntohl(head), (int)ntohl(tail));
+#endif
+ return PM_ERR_LOGREC;
+ }
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG) {
+ if (vol != PM_LOG_VOL_META || ntohl(lpb[1]) == TYPE_INDOM) {
+ fprintf(stderr, "@");
+ if (sts >= 0) {
+ struct timeval stamp;
+ __pmTimeval *tvp = (__pmTimeval *)&lpb[vol == PM_LOG_VOL_META ? 2 : 1];
+ stamp.tv_sec = ntohl(tvp->tv_sec);
+ stamp.tv_usec = ntohl(tvp->tv_usec);
+ __pmPrintStamp(stderr, &stamp);
+ }
+ else
+ fprintf(stderr, "unknown time");
+ }
+ fprintf(stderr, " len=%d (incl head+tail)\n", (int)ntohl(head));
+ }
+#endif
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_PDU) {
+ int i, j;
+ struct timeval stamp;
+ __pmTimeval *tvp = (__pmTimeval *)&lpb[vol == PM_LOG_VOL_META ? 2 : 1];
+ fprintf(stderr, "_pmLogGet");
+ if (vol != PM_LOG_VOL_META || ntohl(lpb[1]) == TYPE_INDOM) {
+ fprintf(stderr, " timestamp=");
+ stamp.tv_sec = ntohl(tvp->tv_sec);
+ stamp.tv_usec = ntohl(tvp->tv_usec);
+ __pmPrintStamp(stderr, &stamp);
+ }
+ fprintf(stderr, " " PRINTF_P_PFX "%p ... " PRINTF_P_PFX "%p", lpb, &lpb[ntohl(head)/sizeof(__pmPDU) - 1]);
+ fputc('\n', stderr);
+ fprintf(stderr, "%03d: ", 0);
+ for (j = 0, i = 0; j < ntohl(head)/sizeof(__pmPDU); j++) {
+ if (i == 8) {
+ fprintf(stderr, "\n%03d: ", j);
+ i = 0;
+ }
+ fprintf(stderr, "0x%x ", lpb[j]);
+ i++;
+ }
+ fputc('\n', stderr);
+ }
+#endif
+
+ *pb = lpb;
+ return 0;
+}
+
+int
+_pmLogPut(FILE *f, __pmPDU *pb)
+{
+ int rlen = ntohl(pb[0]);
+ int sts;
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG) {
+ fprintf(stderr, "_pmLogPut: fd=%d rlen=%d\n",
+ fileno(f), rlen);
+ }
+#endif
+
+ if ((sts = (int)fwrite(pb, 1, rlen, f)) != rlen) {
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_LOG)
+ fprintf(stderr, "_pmLogPut: fwrite=%d %s\n", sts, osstrerror());
+#endif
+ return -oserror();
+ }
+ return 0;
+}
+
+/*
+ * construct new external label, and check label records from
+ * input archives
+ */
+void
+newlabel(void)
+{
+ __pmLogLabel *lp = &logctl.l_label;
+
+ /* check version number */
+ if ((ilabel.ll_magic & 0xff) != PM_LOG_VERS02) {
+ fprintf(stderr,"%s: Error: version number %d (not %d as expected) in archive (%s)\n",
+ pmProgname, ilabel.ll_magic & 0xff, PM_LOG_VERS02, iname);
+ exit(1);
+ }
+
+ /* copy magic number, host and timezone, use our pid */
+ lp->ill_magic = ilabel.ll_magic;
+ lp->ill_pid = (int)getpid();
+ strncpy(lp->ill_hostname, ilabel.ll_hostname, PM_LOG_MAXHOSTLEN);
+ lp->ill_hostname[PM_LOG_MAXHOSTLEN-1] = '\0';
+ strncpy(lp->ill_tz, ilabel.ll_tz, PM_TZ_MAXLEN);
+ lp->ill_tz[PM_TZ_MAXLEN-1] = '\0';
+}
+
+
+/*
+ * write label records into all files of the output archive
+ */
+void
+writelabel(void)
+{
+ logctl.l_label.ill_vol = 0;
+ __pmLogWriteLabel(logctl.l_mfp, &logctl.l_label);
+ logctl.l_label.ill_vol = PM_LOG_VOL_TI;
+ __pmLogWriteLabel(logctl.l_tifp, &logctl.l_label);
+ logctl.l_label.ill_vol = PM_LOG_VOL_META;
+ __pmLogWriteLabel(logctl.l_mdfp, &logctl.l_label);
+}
+
+/*
+ * switch output volumes
+ */
+void
+newvolume(char *base, __pmTimeval *tvp)
+{
+ FILE *newfp;
+ int nextvol = logctl.l_curvol + 1;
+ struct timeval stamp;
+
+ if ((newfp = __pmLogNewFile(base, nextvol)) != NULL) {
+ fclose(logctl.l_mfp);
+ logctl.l_mfp = newfp;
+ logctl.l_label.ill_vol = logctl.l_curvol = nextvol;
+ __pmLogWriteLabel(logctl.l_mfp, &logctl.l_label);
+ fflush(logctl.l_mfp);
+ stamp.tv_sec = tvp->tv_sec;
+ stamp.tv_usec = tvp->tv_usec;
+ fprintf(stderr, "%s: New log volume %d, at ",
+ pmProgname, nextvol);
+ __pmPrintStamp(stderr, &stamp);
+ fputc('\n', stderr);
+ return;
+ }
+ else {
+ fprintf(stderr, "%s: Error: volume %d: %s\n",
+ pmProgname, nextvol, pmErrStr(-oserror()));
+ exit(1);
+ }
+}
diff --git a/src/pmlogreduce/pmlogreduce.c b/src/pmlogreduce/pmlogreduce.c
new file mode 100644
index 0000000..036953a
--- /dev/null
+++ b/src/pmlogreduce/pmlogreduce.c
@@ -0,0 +1,490 @@
+/*
+ * pmlogreduce - statistical reduction of a PCP archive log
+ *
+ * Copyright (c) 2014 Red Hat.
+ * Copyright (c) 2004 Silicon Graphics, Inc. All Rights Reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ *
+ * TODO (global list)
+ * - check for counter overflow in doscan()
+ * - optimization (maybe) for discrete and instantaneous metrics
+ * to suppress repeated values, provided you get the boundary
+ * conditions correct ... beware of mark records and past last
+ * value ... it may be difficult to distinguish ... in practice
+ * this may not be worth it because discrete is rare and
+ * instantaneous is very likely to change over long enough time
+ * ... counter example is hinv.* that interpolate returns on every
+ * fetch, even only once in the input archive
+ * - performance profiling
+ * - testing with dynamic instance domains
+ * - check comments ahead of call to doscan() and the description
+ * in the head of scan.c
+ *
+ * Debug flags
+ * APPL0
+ * initialization
+ * metadata
+ * APPL1
+ * inst-value scanning in doscan()
+ * APPL2
+ * scan summary
+ * details for records read and records written
+ */
+
+#include <sys/stat.h>
+#include "pmlogreduce.h"
+
+/*
+ * globals defined in pmlogreduce.h
+ */
+__pmTimeval current; /* most recent timestamp overall */
+char *iname; /* name of input archive */
+pmLogLabel ilabel; /* input archive label */
+int numpmid; /* all metrics from the input archive */
+pmID *pmidlist;
+char **namelist;
+metric_t *metriclist;
+__pmLogCtl logctl; /* output archive control */
+/* command line args */
+double targ = 600.0; /* -t arg - interval b/n output samples */
+int sarg = -1; /* -s arg - finish after X samples */
+char *Sarg; /* -S arg - window start */
+char *Targ; /* -T arg - window end */
+char *Aarg; /* -A arg - output time alignment */
+int varg = -1; /* -v arg - switch log vol every X */
+int zarg; /* -z arg - use archive timezone */
+char *tz; /* -Z arg - use timezone from user */
+
+int written; /* num log writes so far */
+int exit_status;
+
+/* archive control stuff */
+int ictx_a;
+char *oname; /* name of output archive */
+pmLogLabel olabel; /* output archive label */
+struct timeval winstart_tval; /* window start tval*/
+
+/* time window stuff */
+static struct timeval logstart_tval; /* reduced log start */
+static struct timeval logend_tval; /* reduced log end */
+static struct timeval winend_tval; /* window end tval */
+
+/* cmd line args that could exist, but don't (needed for pmParseTimeWin) */
+static char *Oarg; /* -O arg - non-existent */
+
+static pmLongOptions longopts[] = {
+ PMAPI_OPTIONS_HEADER("Options"),
+ PMOPT_ALIGN,
+ PMOPT_DEBUG,
+ PMOPT_START,
+ PMOPT_SAMPLES,
+ PMOPT_FINISH,
+ { "interval", 1, 't', "DELTA", "sample output interval [default 10min]" },
+ { "", 1, 'v', "NUM", "switch log volumes after this many samples" },
+ PMOPT_TIMEZONE,
+ PMOPT_HOSTZONE,
+ PMOPT_HELP,
+ PMAPI_OPTIONS_END
+};
+
+static pmOptions opts = {
+ .short_options = "A:D:S:s:T:t:v:Z:z?",
+ .long_options = longopts,
+ .short_usage = "[options] input-archive output-archive",
+};
+
+static double
+tv2double(struct timeval *tv)
+{
+ return tv->tv_sec + (double)tv->tv_usec / 1000000.0;
+}
+
+static int
+parseargs(int argc, char *argv[])
+{
+ int c;
+ int sts;
+ char *endnum;
+ char *msg;
+ struct timeval interval;
+
+ while ((c = pmgetopt_r(argc, argv, &opts)) != EOF) {
+ switch (c) {
+
+ case 'A': /* output time alignment */
+ Aarg = opts.optarg;
+ break;
+
+ case 'D': /* debug flag */
+ sts = __pmParseDebug(opts.optarg);
+ if (sts < 0) {
+ pmprintf("%s: unrecognized debug flag specification (%s)\n",
+ pmProgname, opts.optarg);
+ opts.errors++;
+ }
+ else
+ pmDebug |= sts;
+ break;
+
+ case 's': /* number of samples to write out */
+ sarg = (int)strtol(opts.optarg, &endnum, 10);
+ if (*endnum != '\0' || sarg < 0) {
+ pmprintf("%s: -s requires numeric argument\n",
+ pmProgname);
+ opts.errors++;
+ }
+ break;
+
+ case 'S': /* start time for reduction */
+ Sarg = opts.optarg;
+ break;
+
+ case 'T': /* end time for reduction */
+ Targ = opts.optarg;
+ break;
+
+ case 't': /* output sample interval */
+ if (pmParseInterval(opts.optarg, &interval, &msg) < 0) {
+ pmprintf("%s", msg);
+ free(msg);
+ opts.errors++;
+ }
+ else
+ targ = tv2double(&interval);
+ break;
+
+ case 'v': /* number of samples per volume */
+ varg = (int)strtol(opts.optarg, &endnum, 10);
+ if (*endnum != '\0' || varg < 0) {
+ pmprintf("%s: -v requires numeric argument\n",
+ pmProgname);
+ opts.errors++;
+ }
+ break;
+
+ case 'Z': /* use timezone from command line */
+ if (zarg) {
+ pmprintf("%s: at most one of -Z and/or -z allowed\n",
+ pmProgname);
+ opts.errors++;
+
+ }
+ tz = opts.optarg;
+ break;
+
+ case 'z': /* use timezone from archive */
+ if (tz != NULL) {
+ pmprintf("%s: at most one of -Z and/or -z allowed\n",
+ pmProgname);
+ opts.errors++;
+ }
+ zarg++;
+ break;
+
+ case '?':
+ default:
+ opts.errors++;
+ break;
+ }
+ }
+
+ if (opts.errors == 0 && opts.optind > argc-2) {
+ pmprintf("%s: Error: insufficient arguments\n", pmProgname);
+ opts.errors++;
+ }
+
+ return -opts.errors;
+}
+
+int
+main(int argc, char **argv)
+{
+ int sts;
+ char *msg;
+ pmResult *irp; /* input pmResult */
+ pmResult *orp; /* output pmResult */
+ __pmPDU *pb; /* pdu buffer */
+ struct timeval unused;
+ unsigned long peek_offset;
+
+ /* process cmd line args */
+ if (parseargs(argc, argv) < 0) {
+ pmUsageMessage(&opts);
+ exit(1);
+ }
+
+ /* input archive name is argv[opts.optind] */
+ /* output archive name is argv[argc-1]) */
+
+ /* output archive */
+ oname = argv[argc-1];
+
+ /* input archive */
+ iname = argv[opts.optind];
+
+ /*
+ * This is the interp mode context
+ */
+ if ((ictx_a = pmNewContext(PM_CONTEXT_ARCHIVE, iname)) < 0) {
+ fprintf(stderr, "%s: Error: cannot open archive \"%s\" (ctx_a): %s\n",
+ pmProgname, iname, pmErrStr(ictx_a));
+ exit(1);
+ }
+
+ if ((sts = pmGetArchiveLabel(&ilabel)) < 0) {
+ fprintf(stderr, "%s: Error: cannot get archive label record (%s): %s\n", pmProgname, iname, pmErrStr(sts));
+ exit(1);
+ }
+
+ /* start time */
+ logstart_tval.tv_sec = ilabel.ll_start.tv_sec;
+ logstart_tval.tv_usec = ilabel.ll_start.tv_usec;
+
+ /* end time */
+ if ((sts = pmGetArchiveEnd(&logend_tval)) < 0) {
+ fprintf(stderr, "%s: Error: cannot get end of archive (%s): %s\n",
+ pmProgname, iname, pmErrStr(sts));
+ exit(1);
+ }
+
+ if (zarg) {
+ /* use TZ from metrics source (input-archive) */
+ if ((sts = pmNewZone(ilabel.ll_tz)) < 0) {
+ fprintf(stderr, "%s: Cannot set context timezone: %s\n",
+ pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+ printf("Note: timezone set to local timezone of host \"%s\" from archive\n\n", ilabel.ll_hostname);
+ }
+ else if (tz != NULL) {
+ /* use TZ as specified by user */
+ if ((sts = pmNewZone(tz)) < 0) {
+ fprintf(stderr, "%s: Cannot set timezone to \"%s\": %s\n",
+ pmProgname, tz, pmErrStr(sts));
+ exit(1);
+ }
+ printf("Note: timezone set to \"TZ=%s\"\n\n", tz);
+ }
+ else {
+ char *tz;
+ tz = __pmTimezone();
+ /* use TZ from local host */
+ if ((sts = pmNewZone(tz)) < 0) {
+ fprintf(stderr, "%s: Cannot set local host's timezone: %s\n",
+ pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+ }
+
+ /* set winstart and winend timevals */
+ sts = pmParseTimeWindow(Sarg, Targ, Aarg, Oarg,
+ &logstart_tval, &logend_tval,
+ &winstart_tval, &winend_tval, &unused, &msg);
+ if (sts < 0) {
+ fprintf(stderr, "%s: Invalid time window specified: %s\n",
+ pmProgname, msg);
+ exit(1);
+ }
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL0) {
+ char buf[26];
+ pmCtime((const time_t *)&winstart_tval.tv_sec, buf);
+ fprintf(stderr, "Start time: %s", buf);
+ pmCtime((const time_t *)&winend_tval.tv_sec, buf);
+ fprintf(stderr, "End time: %s", buf);
+ }
+#endif
+
+ if ((sts = pmSetMode(PM_MODE_INTERP | PM_XTB_SET(PM_TIME_SEC),
+ &winstart_tval, (int)targ)) < 0) {
+ fprintf(stderr, "%s: pmSetMode(PM_MODE_INTERP ...) failed: %s\n",
+ pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+
+ /* create output log - must be done before writing label */
+ if ((sts = __pmLogCreate("", oname, PM_LOG_VERS02, &logctl)) < 0) {
+ fprintf(stderr, "%s: Error: __pmLogCreate: %s\n",
+ pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+
+ /* This must be done after log is created:
+ * - checks that archive version, host, and timezone are ok
+ * - set archive version, host, and timezone of output archive
+ * - set start time
+ * - write labels
+ */
+ newlabel();
+ current.tv_sec = logctl.l_label.ill_start.tv_sec = winstart_tval.tv_sec;
+ current.tv_usec = logctl.l_label.ill_start.tv_usec = winstart_tval.tv_usec;
+ /* write label record */
+ writelabel();
+ /*
+ * Supress any automatic label creation in libpcp at the first
+ * pmResult write.
+ */
+ logctl.l_state = PM_LOG_STATE_INIT;
+
+ /*
+ * Traverse the PMNS to get all the metrics and their metadata
+ */
+ if ((sts = pmTraversePMNS ("", dometric)) < 0) {
+ fprintf(stderr, "%s: Error traversing namespace ... %s\n",
+ pmProgname, pmErrStr(sts));
+ goto cleanup;
+ }
+
+ /*
+ * All the initial metadata has been generated, add timestamp
+ */
+ fflush(logctl.l_mdfp);
+ __pmLogPutIndex(&logctl, &current);
+
+ written = 0;
+
+ /*
+ * main loop
+ */
+ while (sarg == -1 || written < sarg) {
+ /*
+ * do stuff
+ */
+ if ((sts = pmUseContext(ictx_a)) < 0) {
+ fprintf(stderr, "%s: Error: cannot use context (%s): %s\n",
+ pmProgname, iname, pmErrStr(sts));
+ goto cleanup;
+ }
+ if ((sts = pmFetch(numpmid, pmidlist, &irp)) < 0) {
+ if (sts == PM_ERR_EOL)
+ break;
+ fprintf(stderr,
+ "%s: Error: pmFetch failed: %s\n", pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+ if (irp->timestamp.tv_sec > winend_tval.tv_sec ||
+ (irp->timestamp.tv_sec == winend_tval.tv_sec &&
+ irp->timestamp.tv_usec > winend_tval.tv_usec)) {
+ /* past end time as per -T */
+ break;
+ }
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL2) {
+ fprintf(stderr, "input record ...\n");
+ __pmDumpResult(stderr, irp);
+ }
+#endif
+
+ /*
+ * traverse the interval, looking at every archive record ...
+ * we are particularly interested in:
+ * - metric-values that are interpolated but not present in
+ * this interval
+ * - counter wraps
+ * - mark records
+ */
+ doscan(&irp->timestamp);
+
+ if ((sts = pmUseContext(ictx_a)) < 0) {
+ fprintf(stderr, "%s: Error: cannot use context (%s): %s\n",
+ pmProgname, iname, pmErrStr(sts));
+ goto cleanup;
+ }
+
+ orp = rewrite(irp);
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL2) {
+ if (orp == NULL)
+ fprintf(stderr, "output record ... none!\n");
+ else {
+ fprintf(stderr, "output record ...\n");
+ __pmDumpResult(stderr, orp);
+ }
+ }
+#endif
+ if (orp == NULL)
+ goto next;
+
+ /*
+ * convert log record to a PDU, and enforce V2 encoding semantics,
+ * then write it out
+ */
+ sts = __pmEncodeResult(PDU_OVERRIDE2, orp, &pb);
+ if (sts < 0) {
+ fprintf(stderr, "%s: Error: __pmEncodeResult: %s\n",
+ pmProgname, pmErrStr(sts));
+ goto cleanup;
+ }
+
+ /* switch volumes if required */
+ if (varg > 0) {
+ if (written > 0 && (written % varg) == 0) {
+ __pmTimeval next_stamp;
+ next_stamp.tv_sec = irp->timestamp.tv_sec;
+ next_stamp.tv_usec = irp->timestamp.tv_usec;
+ newvolume(oname, &next_stamp);
+ }
+ }
+ /*
+ * Even without a -v option, we may need to switch volumes
+ * if the data file exceeds 2^31-1 bytes
+ */
+ peek_offset = ftell(logctl.l_mfp);
+ peek_offset += ((__pmPDUHdr *)pb)->len - sizeof(__pmPDUHdr) + 2*sizeof(int);
+ if (peek_offset > 0x7fffffff) {
+ __pmTimeval next_stamp;
+ next_stamp.tv_sec = irp->timestamp.tv_sec;
+ next_stamp.tv_usec = irp->timestamp.tv_usec;
+ newvolume(oname, &next_stamp);
+ }
+
+ current.tv_sec = orp->timestamp.tv_sec;
+ current.tv_usec = orp->timestamp.tv_usec;
+
+ doindom(orp);
+
+ /* write out log record */
+ sts = __pmLogPutResult2(&logctl, pb);
+ __pmUnpinPDUBuf(pb);
+ if (sts < 0) {
+ fprintf(stderr, "%s: Error: __pmLogPutResult2: log data: %s\n",
+ pmProgname, pmErrStr(sts));
+ goto cleanup;
+ }
+ written++;
+
+ rewrite_free();
+
+next:
+ pmFreeResult(irp);
+ }
+
+ /* write the last time stamp */
+ fflush(logctl.l_mfp);
+ fflush(logctl.l_mdfp);
+ __pmLogPutIndex(&logctl, &current);
+
+ exit(exit_status);
+
+cleanup:
+ {
+ char fname[MAXNAMELEN];
+ fprintf(stderr, "Archive \"%s\" not created.\n", oname);
+ snprintf(fname, sizeof(fname), "%s.0", oname);
+ unlink(fname);
+ snprintf(fname, sizeof(fname), "%s.meta", oname);
+ unlink(fname);
+ snprintf(fname, sizeof(fname), "%s.index", oname);
+ unlink(fname);
+ exit(1);
+ }
+}
diff --git a/src/pmlogreduce/pmlogreduce.h b/src/pmlogreduce/pmlogreduce.h
new file mode 100644
index 0000000..c654518
--- /dev/null
+++ b/src/pmlogreduce/pmlogreduce.h
@@ -0,0 +1,85 @@
+#include "pmapi.h"
+#include "impl.h"
+
+#define NUM_SEC_PER_DAY 86400
+
+/*
+ * Value control for a metric-instance and the last observed input value.
+ * Used for rate conversion and supression of repeating values for
+ * instantaneous and discrete metrics
+ */
+typedef struct value {
+ struct value *next; /* next for this metric */
+ int inst; /* instance id */
+ pmAtomValue value; /* last output value */
+ struct timeval timestamp; /* time of last output value */
+ /*
+ * last interval value interpretation ... set in doscan() and used in
+ * rewrite()
+ */
+ int control;
+ int nobs; /* number of observations */
+ int nwrap; /* number of counter wraps */
+ pmAtomValue pvalue; /* used for counter wrap detection */
+} value_t;
+
+/*
+ * control values ... bit fields
+ */
+#define V_INIT 1
+#define V_SEEN 2
+
+/*
+ * instance domain control
+ */
+typedef struct {
+ int indom;
+ int numinst;
+ int *inst;
+ char **name;
+} indom_t;
+
+/*
+ * Metric control record in metric hash list
+ */
+typedef struct {
+ pmDesc idesc; /* input archive descriptor */
+ pmDesc odesc; /* output archive descriptor */
+ value_t *first; /* list of values, one per instance */
+ indom_t *idp; /* instance domain control, if any */
+ int mode; /* have to skip or rewrite the value format */
+} metric_t;
+#define MODE_NORMAL 0
+#define MODE_REWRITE 1
+#define MODE_SKIP 2
+
+extern __pmTimeval current; /* most recent timestamp overall */
+extern char *iname; /* name of input archive */
+extern pmLogLabel ilabel; /* input archive label */
+extern int numpmid; /* all metrics from the input archive */
+extern pmID *pmidlist; /* ditto */
+extern char **namelist; /* ditto */
+extern metric_t *metriclist; /* ditto */
+extern __pmLogCtl logctl; /* output archive control */
+extern double targ; /* -t arg - interval b/n output samples */
+extern int sarg; /* -s arg - finish after X samples */
+extern char *Sarg; /* -S arg - window start */
+extern char *Targ; /* -T arg - window end */
+extern char *Aarg; /* -A arg - output time alignment */
+extern int varg; /* -v arg - switch log vol every X */
+extern int zarg; /* -z arg - use archive timezone */
+extern char *tz; /* -Z arg - use timezone from user */
+
+
+extern int _pmLogGet(__pmLogCtl *, int, __pmPDU **);
+extern int _pmLogPut(FILE *, __pmPDU *);
+extern void newlabel(void);
+extern void writelabel(void);
+extern void newvolume(char *, __pmTimeval *);
+
+extern pmResult *rewrite(pmResult *);
+extern void rewrite_free(void);
+
+extern void dometric(const char *);
+extern void doindom(pmResult *);
+extern void doscan(struct timeval *);
diff --git a/src/pmlogreduce/rewrite.c b/src/pmlogreduce/rewrite.c
new file mode 100644
index 0000000..eae00e4
--- /dev/null
+++ b/src/pmlogreduce/rewrite.c
@@ -0,0 +1,178 @@
+#include "pmlogreduce.h"
+#include <inttypes.h>
+
+static pmResult *orp;
+
+/*
+ * Must either re-write the pmResult, or return NULL for non-fatal
+ * errors, else report and exit for catastrophic errors ...
+ */
+pmResult *
+rewrite(pmResult *rp)
+{
+ int i;
+ int sts;
+
+ if ((orp = (pmResult *)malloc(sizeof(pmResult) +
+ (rp->numpmid - 1) * sizeof(pmValueSet *))) == NULL) {
+ fprintf(stderr,
+ "%s: rewrite: cannot malloc pmResult for %d metrics\n",
+ pmProgname, rp->numpmid);
+ exit(1);
+ }
+ orp->numpmid = 0;
+ orp->timestamp = rp->timestamp; /* struct assignment */
+
+ for (i = 0; i < rp->numpmid; i++) {
+ metric_t *mp;
+ value_t *vp;
+ pmValueSet *vsp = rp->vset[i];
+ pmValueSet *ovsp;
+ int j;
+ int need;
+
+ if (pmidlist[i] != vsp->pmid) {
+ fprintf(stderr,
+ "%s: rewrite: Arrgh, mismatched PMID %s vs %s\n",
+ pmProgname, pmIDStr(pmidlist[i]), pmIDStr(vsp->pmid));
+ exit(1);
+ }
+
+ if (vsp->numval > 0)
+ need = (vsp->numval - 1) * sizeof(pmValue);
+ else
+ need = 0;
+ ovsp = (pmValueSet *)malloc(sizeof(pmValueSet) +
+ need*sizeof(pmValue));
+ if (ovsp == NULL) {
+ __uint64_t bytes = (sizeof(pmValueSet) + need * sizeof(pmValue));
+ fprintf(stderr,
+ "%s: rewrite: Arrgh, cannot malloc %"PRIi64" bytes for ovsp\n",
+ pmProgname, bytes);
+ exit(1);
+ }
+ ovsp->pmid = vsp->pmid;
+ ovsp->valfmt = vsp->valfmt;
+ if (vsp->numval <= 0) {
+ ovsp->numval = vsp->numval;
+ orp->vset[orp->numpmid] = ovsp;
+ orp->numpmid++;
+ }
+ else {
+ ovsp->numval = 0;
+ mp = &metriclist[i];
+ if (mp->mode != MODE_SKIP) {
+ for (j = 0; j < vsp->numval; j++) {
+ for (vp = mp->first; vp != NULL; vp = vp->next) {
+ if (vp->inst == vsp->vlist[j].inst)
+ break;
+ }
+ if (vp == NULL) {
+ fprintf(stderr,
+ "%s: rewrite: Arrgh: cannot find inst %d in value_t list for %s (%s)\n",
+ pmProgname, vsp->vlist[j].inst, namelist[i], pmIDStr(vsp->pmid));
+ exit(1);
+ }
+ if ((vp->control & (V_SEEN|V_INIT)) == 0)
+ continue;
+ /*
+ * we've seen this metric-instance pair in the last
+ * interval, or it is the first time for this one
+ */
+ if (mp->mode == MODE_REWRITE) {
+ pmAtomValue av;
+ int k;
+ sts = pmExtractValue(vsp->valfmt, &vsp->vlist[j], mp->idesc.type, &av, mp->odesc.type);
+ if (sts < 0) {
+ fprintf(stderr,
+ "%s: rewrite: pmExtractValue failed for pmid %s value %d: %s\n",
+ pmProgname, pmIDStr(vsp->pmid), j, pmErrStr(sts));
+ exit(1);
+ }
+ ovsp->pmid = vsp->pmid;
+ ovsp->vlist[ovsp->numval].inst = vsp->vlist[j].inst;
+ k = __pmStuffValue(&av, &ovsp->vlist[ovsp->numval], mp->odesc.type);
+ if (k < 0) {
+ fprintf(stderr,
+ "%s: rewrite: __pmStuffValue failed for pmid %s value %d: %s\n",
+ pmProgname, pmIDStr(vsp->pmid), j, pmErrStr(sts));
+ exit(1);
+ }
+ if (ovsp->numval == 0)
+ ovsp->valfmt = k;
+ ovsp->numval++;
+ vp->timestamp = rp->timestamp;
+ vp->value = av;
+ }
+ else {
+ ovsp->vlist[ovsp->numval] = vsp->vlist[j];
+ ovsp->numval++;
+ }
+ vp->control &= ~V_INIT;
+ }
+ }
+ if (ovsp->numval > 0) {
+ orp->vset[orp->numpmid] = ovsp;
+ orp->numpmid++;
+ }
+ else
+ free(ovsp);
+ }
+ }
+
+ if (orp->numpmid == 0) {
+ /*
+ * very unlikely that all metrics are either skipped or have
+ * no values, but it might happen ... do not allow this record
+ * to be written because it looks like a "mark" record with
+ * numpmid == 0
+ */
+ free(orp);
+ orp = NULL;
+ }
+
+ return orp;
+}
+
+void
+rewrite_free(void)
+{
+ int i;
+
+ if (orp == NULL)
+ return;
+
+ for (i = 0; i < orp->numpmid; i++) {
+ pmValueSet *vsp = orp->vset[i];
+ int j;
+ metric_t *mp;
+
+ for (j = 0; j < numpmid; j++) {
+ if (vsp->pmid == pmidlist[j])
+ break;
+ }
+ if (j == numpmid) {
+ fprintf(stderr,
+ "%s: rewrite_free: Arrgh, cannot find pmid %s in pmidlist[]\n",
+ pmProgname, pmIDStr(vsp->pmid));
+ exit(1);
+ }
+ mp = &metriclist[j];
+
+ if (vsp->numval > 0 && mp->mode == MODE_REWRITE) {
+ /*
+ * MODE_REWRITE implies the value was promoted to 64-bit
+ * and the pval in the pmResult came from the __pmStuffValue()
+ * call above, so free it here
+ */
+ for (j = 0; j < vsp->numval; j++) {
+ free(vsp->vlist[j].value.pval);
+ }
+ }
+
+ free(vsp);
+ }
+
+ free(orp);
+ orp = NULL;
+}
diff --git a/src/pmlogreduce/scan.c b/src/pmlogreduce/scan.c
new file mode 100644
index 0000000..e1bece7
--- /dev/null
+++ b/src/pmlogreduce/scan.c
@@ -0,0 +1,232 @@
+#include "pmlogreduce.h"
+
+static struct timeval last_tv = { 0, 0 };
+static int ictx_b = -1;
+
+extern struct timeval winstart_tval;
+
+/*
+ * This is the heart of the data reduction algorithm. The term
+ * metric-instance is used here to reflect the fact that this computation
+ * has to be performed for every instance of every metric.
+ *
+ * 1. need to look at every input archive record going forward from the
+ * current point up to the last one <= end (time) ... so no interp
+ * mode here
+ *
+ * 2. for counter metric-instances, look for and count "wraps"
+ *
+ * 3. for instantenous or discrete metric-instances with a numeric type,
+ * compute the arithmetic average of the observations over the
+ * interval
+ *
+ * 4. for _all_ metric-instances if there are no observations in the
+ * interval, then we'd like to supress this metric-instance from the
+ * output archive
+ *
+ * 5. all of the above has to be done in a way that makes sense in the
+ * presence of mark records
+ */
+
+void
+doscan(struct timeval *end)
+{
+ pmResult *rp;
+ value_t *vp;
+ int sts;
+ int i;
+ int ir;
+ int nr;
+
+ if (ictx_b == -1) {
+ /*
+ * first time, create the record at a time mode context for the
+ * input archive
+ */
+ if ((ictx_b = pmNewContext(PM_CONTEXT_ARCHIVE, iname)) < 0) {
+ fprintf(stderr, "%s: Error: cannot open archive \"%s\" (ctx_b): %s\n",
+ pmProgname, iname, pmErrStr(ictx_b));
+ exit(1);
+ }
+
+ if ((sts = pmSetMode(PM_MODE_FORW, NULL, 0)) < 0) {
+ fprintf(stderr,
+ "%s: Error: pmSetMode (ictx_b) failed: %s\n", pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+ }
+
+ if ((sts = pmUseContext(ictx_b)) < 0) {
+ fprintf(stderr, "%s: doscan: Error: cannot use context: %s\n",
+ pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+
+ for (i = 0; i < numpmid; i++) {
+ for (vp = metriclist[i].first; vp != NULL; vp = vp->next) {
+ vp->nobs = vp->nwrap = 0;
+ vp->control &= ~V_SEEN;
+ }
+ }
+
+ for (nr = 0; ; nr++) {
+
+ if ((sts = pmFetchArchive(&rp)) < 0) {
+ if (sts == PM_ERR_EOL)
+ break;
+ fprintf(stderr,
+ "%s: doscan: Error: pmFetch failed: %s\n", pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL2) {
+ if (nr == 0) {
+ fprintf(stderr, "scan starts at ");
+ __pmPrintStamp(stderr, &rp->timestamp);
+ fprintf(stderr, "\n");
+ }
+ }
+#endif
+
+ if (rp->numpmid == 0) {
+ /*
+ * Mark record ... copy into the output file as we cannot
+ * pretend there is data between the previous data record
+ * and the next data record
+ * Logic copied from pmlogextract.
+ */
+ struct {
+ __pmPDU len;
+ __pmPDU type;
+ __pmPDU from;
+ __pmTimeval timestamp;
+ int numpmid; /* zero PMIDs to follow */
+ __pmPDU trailer;
+ } markrec;
+ /*
+ * add space for, but don't bump length for, trailer so
+ * __pmLogPutResult2() has space for trailer in the buffer
+ */
+ markrec.len = sizeof(markrec) - sizeof(__pmPDU);
+ markrec.type = markrec.from = 0;
+ markrec.timestamp.tv_sec = htonl(rp->timestamp.tv_sec);
+ markrec.timestamp.tv_usec = htonl(rp->timestamp.tv_usec);
+ markrec.numpmid = 0;
+ if ((sts = __pmLogPutResult2(&logctl, (__pmPDU *)&markrec)) < 0) {
+ fprintf(stderr, "%s: Error: __pmLogPutResult2: mark record write: %s\n",
+ pmProgname, pmErrStr(sts));
+ exit(1);
+ }
+ /*
+ * continue on to check the interval range ... numpmid == 0
+ * makes the rest of the loop safe
+ */
+ }
+
+ if (rp->timestamp.tv_sec > end->tv_sec ||
+ (rp->timestamp.tv_sec == end->tv_sec &&
+ rp->timestamp.tv_usec > end->tv_usec)) {
+ /*
+ * past the end of the interval, remember timestamp so we
+ * can resume here next time
+ */
+ last_tv = rp->timestamp; /* struct assignment */
+ pmFreeResult(rp);
+ break;
+ }
+
+ for (ir = 0; ir < rp->numpmid; ir++) {
+ pmValueSet *vsp;
+ int j;
+ metric_t *mp;
+
+ vsp = rp->vset[ir];
+ if (vsp->numval <= 0)
+ continue;
+
+ for (i = 0; i < numpmid; i++) {
+ if (vsp->pmid == pmidlist[i])
+ break;
+ }
+ if (i == numpmid) {
+ fprintf(stderr,
+ "%s: scan: Arrgh, cannot find pid %s in pidlist[]\n",
+ pmProgname, pmIDStr(vsp->pmid));
+ exit(1);
+ }
+ mp = &metriclist[i];
+ if (mp->mode == MODE_SKIP)
+ continue;
+
+ for (j = 0; j < vsp->numval; j++) {
+ value_t *lvp = NULL;
+ for (vp = mp->first; vp != NULL; vp = vp->next) {
+ if (vp->inst == vsp->vlist[j].inst)
+ break;
+ lvp = vp;
+ }
+ if (vp == NULL) {
+ vp = (value_t *)malloc(sizeof(value_t));
+ if (vp == NULL) {
+ fprintf(stderr,
+ "%s: rewrite: Arrgh, cannot malloc value_t\n", pmProgname);
+ exit(1);
+ }
+ if (lvp == NULL)
+ mp->first = vp;
+ else
+ lvp->next = vp;
+ vp->inst = vsp->vlist[j].inst;
+ vp->nobs = vp->nwrap = 0;
+ vp->control = V_INIT;
+ vp->next = NULL;
+#if PCP_DEBUG
+
+ if (pmDebug & DBG_TRACE_APPL1) {
+ fprintf(stderr,
+ "add value_t for %s (%s) inst %d\n",
+ namelist[i], pmIDStr(pmidlist[i]), vsp->vlist[j].inst);
+ }
+#endif
+ }
+ /* TODO ... hard part goes here 8^) */
+ if (mp->idesc.sem == PM_SEM_COUNTER) {
+ /*
+ * OK, this metric is a counter, scan each instance
+ * looking for potential wraps
+ */
+ ;
+ }
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL1) {
+ __pmPrintStamp(stderr, &rp->timestamp);
+ fprintf(stderr, ": seen %s (%s) inst %d\n",
+ namelist[i], pmIDStr(pmidlist[i]),
+ vsp->vlist[j].inst);
+ }
+#endif
+ vp->control |= V_SEEN;
+ }
+ }
+
+ pmFreeResult(rp);
+ }
+#if PCP_DEBUG
+ if (pmDebug & DBG_TRACE_APPL2) {
+ fprintf(stderr, "scan ends at ");
+ __pmPrintStamp(stderr, &last_tv);
+ if (sts == PM_ERR_EOL)
+ fprintf(stderr, " [EOL]");
+ fprintf(stderr, " (%d records)\n", nr);
+ }
+#endif
+
+ if ((sts = pmSetMode(PM_MODE_FORW, &last_tv, 0)) < 0) {
+ fprintf(stderr,
+ "%s: doscan: Error: pmSetMode (ictx_b) time=", pmProgname);
+ __pmPrintStamp(stderr, &last_tv);
+ fprintf(stderr,
+ " failed: %s\n", pmErrStr(sts));
+ exit(1);
+ }
+}