summaryrefslogtreecommitdiff
path: root/src/libpcp/src/interp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libpcp/src/interp.c')
-rw-r--r--src/libpcp/src/interp.c1714
1 files changed, 1714 insertions, 0 deletions
diff --git a/src/libpcp/src/interp.c b/src/libpcp/src/interp.c
new file mode 100644
index 0000000..ec49387
--- /dev/null
+++ b/src/libpcp/src/interp.c
@@ -0,0 +1,1714 @@
+/*
+ * Copyright (c) 1995,2004 Silicon Graphics, Inc. All Rights Reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * Thread-safe notes:
+ *
+ * nr[] and nr_cache[] are diagnostic counters that are maintained with
+ * non-atomic updates ... we've decided that it is acceptable for their
+ * values to be subject to possible (but unlikely) missed updates
+ */
+
+/*
+ * Note: _FORTIFY_SOURCE cannot be set here because the calls to memcpy()
+ * with vp->vbuf as the destination raise a warning that is not
+ * correct as the allocation for a pmValueBlock always extends
+ * vbuf[] to be the correct size, not the [1] as per the declaration
+ * in pmapi.h
+ */
+#ifdef _FORTIFY_SOURCE
+#undef _FORTIFY_SOURCE
+#define _FORTIFY_SOURCE 0
+#endif
+
+#include <limits.h>
+#include <inttypes.h>
+#include <assert.h>
+#include "pmapi.h"
+#include "impl.h"
+
+#define UPD_MARK_NONE 0
+#define UPD_MARK_FORW 1
+#define UPD_MARK_BACK 2
+
+#if defined(HAVE_CONST_LONGLONG)
+#define SIGN_64_MASK 0x8000000000000000LL
+#else
+#define SIGN_64_MASK 0x8000000000000000
+#endif
+
+typedef union { /* value from pmResult */
+ pmValueBlock *pval;
+ int lval;
+} value;
+
+/*
+ * state values for s_prior and s_next
+ */
+#define S_UNDEFINED 0 /* no searching done yet */
+#define S_NOTFOUND 1 /* searched to end or start of archive and
+ did not find a value or a <mark> */
+#define S_MARK 2 /* searched and found <mark> before value
+ at t_prior or t_mext */
+#define S_VALUE 3 /* searched and found value at t_prior or
+ t_next */
+
+static const char *statestr[] = { " <undefined>", " <notfound>", " <mark>", "" };
+
+
+typedef struct instcntl { /* metric-instance control */
+ struct instcntl *next; /* next for this metric control */
+ struct instcntl *want; /* ones of interest */
+ struct instcntl *unbound; /* not yet bound above [or below] */
+ int search; /* looking for found this one? */
+ int inst; /* instance identifier */
+ int inresult; /* will be in this result */
+ double t_prior;
+ int s_prior; /* state at t_prior */
+ value v_prior;
+ double t_next;
+ int s_next; /* state at t_next */
+ value v_next;
+ double t_first; /* no records before this */
+ double t_last; /* no records after this */
+ struct pmidcntl *metric; /* back to metric control */
+} instcntl_t;
+
+typedef struct pmidcntl { /* metric control */
+ pmDesc desc;
+ int valfmt; /* used to build result */
+ int numval; /* number of instances in this result */
+ struct instcntl *first; /* first metric-instace control */
+} pmidcntl_t;
+
+typedef struct {
+ pmResult *rp; /* cached pmResult from __pmLogRead */
+ int sts; /* from __pmLogRead */
+ FILE *mfp; /* log stream */
+ int vol; /* log volume */
+ long head_posn; /* posn in file before forwards __pmLogRead */
+ long tail_posn; /* posn in file after forwards __pmLogRead */
+ int mode; /* PM_MODE_FORW or PM_MODE_BACK */
+ int used; /* used count for LFU replacement */
+} cache_t;
+
+#define NUMCACHE 4
+
+/*
+ * diagnostic counters ... indexed by PM_MODE_FORW (2) and
+ * PM_MODE_BACK (3), hence 4 elts for cached and non-cached reads
+ */
+static long nr_cache[PM_MODE_BACK+1];
+static long nr[PM_MODE_BACK+1];
+
+/*
+ * called with the context lock held
+ */
+static int
+cache_read(__pmArchCtl *acp, int mode, pmResult **rp)
+{
+ long posn;
+ cache_t *cp;
+ cache_t *lfup;
+ cache_t *cache;
+ int save_curvol;
+
+ if (acp->ac_vol == acp->ac_log->l_curvol) {
+ posn = ftell(acp->ac_log->l_mfp);
+ assert(posn >= 0);
+ }
+ else
+ posn = 0;
+
+ if (acp->ac_cache == NULL) {
+ /* cache initialization */
+ acp->ac_cache = cache = (cache_t *)malloc(NUMCACHE*sizeof(cache_t));
+ // TODO error check
+ for (cp = cache; cp < &cache[NUMCACHE]; cp++) {
+ cp->rp = NULL;
+ cp->mfp = NULL;
+ }
+ acp->ac_cache_idx = 0;
+ }
+ else
+ cache = acp->ac_cache;
+
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_LOG) && (pmDebug & DBG_TRACE_INTERP)) {
+ fprintf(stderr, "cache_read: fd=%d mode=%s vol=%d (curvol=%d) %s_posn=%ld ",
+ fileno(acp->ac_log->l_mfp),
+ mode == PM_MODE_FORW ? "forw" : "back",
+ acp->ac_vol, acp->ac_log->l_curvol,
+ mode == PM_MODE_FORW ? "head" : "tail",
+ (long)posn);
+ }
+#endif
+
+ acp->ac_cache_idx = (acp->ac_cache_idx + 1) % NUMCACHE;
+ lfup = &cache[acp->ac_cache_idx];
+ for (cp = cache; cp < &cache[NUMCACHE]; cp++) {
+ if (cp->mfp == acp->ac_log->l_mfp && cp->vol == acp->ac_vol &&
+ ((mode == PM_MODE_FORW && cp->head_posn == posn) ||
+ (mode == PM_MODE_BACK && cp->tail_posn == posn)) &&
+ cp->rp != NULL) {
+ int sts;
+ *rp = cp->rp;
+ cp->used++;
+ if (mode == PM_MODE_FORW)
+ fseek(acp->ac_log->l_mfp, cp->tail_posn, SEEK_SET);
+ else
+ fseek(acp->ac_log->l_mfp, cp->head_posn, SEEK_SET);
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_LOG) && (pmDebug & DBG_TRACE_INTERP)) {
+ __pmTimeval tmp;
+ double t_this;
+ tmp.tv_sec = (__int32_t)cp->rp->timestamp.tv_sec;
+ tmp.tv_usec = (__int32_t)cp->rp->timestamp.tv_usec;
+ t_this = __pmTimevalSub(&tmp, &acp->ac_log->l_label.ill_start);
+ fprintf(stderr, "hit cache[%d] t=%.6f\n",
+ (int)(cp - cache), t_this);
+ nr_cache[mode]++;
+ }
+#endif
+ sts = cp->sts;
+ return sts;
+ }
+ }
+
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_LOG) && (pmDebug & DBG_TRACE_INTERP))
+ fprintf(stderr, "miss\n");
+ nr[mode]++;
+#endif
+
+ if (lfup->rp != NULL)
+ pmFreeResult(lfup->rp);
+
+ save_curvol = acp->ac_log->l_curvol;
+
+ lfup->sts = __pmLogRead(acp->ac_log, mode, NULL, &lfup->rp, PMLOGREAD_NEXT);
+ if (lfup->sts < 0)
+ lfup->rp = NULL;
+ *rp = lfup->rp;
+
+ if (posn == 0 || save_curvol != acp->ac_log->l_curvol) {
+ /*
+ * vol switch since last time, or vol switch in __pmLogRead() ...
+ * new vol, stdio stream and we don't know where we started from
+ * ... don't cache
+ */
+ lfup->mfp = NULL;
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_LOG) && (pmDebug & DBG_TRACE_INTERP))
+ fprintf(stderr, "cache_read: reload vol switch, mark cache[%d] unused\n",
+ (int)(lfup - cache));
+#endif
+ }
+ else {
+ lfup->mode = mode;
+ lfup->vol = acp->ac_vol;
+ lfup->mfp = acp->ac_log->l_mfp;
+ lfup->used = 1;
+ if (mode == PM_MODE_FORW) {
+ lfup->head_posn = posn;
+ lfup->tail_posn = ftell(acp->ac_log->l_mfp);
+ assert(lfup->tail_posn >= 0);
+ }
+ else {
+ lfup->tail_posn = posn;
+ lfup->head_posn = ftell(acp->ac_log->l_mfp);
+ assert(lfup->head_posn >= 0);
+ }
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_LOG) && (pmDebug & DBG_TRACE_INTERP)) {
+ fprintf(stderr, "cache_read: reload cache[%d] vol=%d (curvol=%d) head=%ld tail=%ld ",
+ (int)(lfup - cache), lfup->vol, acp->ac_log->l_curvol,
+ (long)lfup->head_posn, (long)lfup->tail_posn);
+ if (lfup->sts == 0)
+ fprintf(stderr, "sts=%d\n", lfup->sts);
+ else {
+ char errmsg[PM_MAXERRMSGLEN];
+ fprintf(stderr, "sts=%s\n", pmErrStr_r(lfup->sts, errmsg, sizeof(errmsg)));
+ }
+ }
+#endif
+ }
+
+ return lfup->sts;
+}
+
+void
+__pmLogCacheClear(FILE *mfp)
+{
+ /* retired ... functionality moved to __pmFreeInterpData() */
+ return;
+}
+
+#ifdef PCP_DEBUG
+/*
+ * prior == 1 for ?_prior fields, else use ?_next fields
+ */
+static void
+dumpval(FILE *f, int type, int valfmt, int prior, instcntl_t *icp)
+{
+ int mark;
+ value *vp;
+ if (prior) {
+ if (icp->t_prior == -1) {
+ fprintf(stderr, " <undefined>");
+ return;
+ }
+ mark = icp->s_prior == S_MARK;
+ vp = &icp->v_prior;
+ }
+ else {
+ if (icp->t_next == -1) {
+ fprintf(stderr, " <undefined>");
+ return;
+ }
+ mark = icp->s_next == S_MARK;
+ vp = &icp->v_next;
+ }
+ if (mark) {
+ fprintf(f, " <mark>");
+ return;
+ }
+ if (type == PM_TYPE_32 || type == PM_TYPE_U32)
+ fprintf(f, " v=%d", vp->lval);
+ else if (type == PM_TYPE_FLOAT && valfmt == PM_VAL_INSITU) {
+ float tmp;
+ memcpy((void *)&tmp, (void *)&vp->lval, sizeof(tmp));
+ fprintf(f, " v=%f", (double)tmp);
+ }
+ else if (type == PM_TYPE_64) {
+ __int64_t tmp;
+ memcpy((void *)&tmp, (void *)vp->pval->vbuf, sizeof(tmp));
+ fprintf(f, " v=%"PRIi64, tmp);
+ }
+ else if (type == PM_TYPE_U64) {
+ __uint64_t tmp;
+ memcpy((void *)&tmp, (void *)vp->pval->vbuf, sizeof(tmp));
+ fprintf(f, " v=%"PRIu64, tmp);
+ }
+ else if (type == PM_TYPE_FLOAT) {
+ float tmp;
+ memcpy((void *)&tmp, (void *)vp->pval->vbuf, sizeof(tmp));
+ fprintf(f, " v=%f", (double)tmp);
+ }
+ else if (type == PM_TYPE_DOUBLE) {
+ double tmp;
+ memcpy((void *)&tmp, (void *)vp->pval->vbuf, sizeof(tmp));
+ fprintf(f, " v=%f", tmp);
+ }
+ else
+ fprintf(f, "v=??? (lval=%d)", vp->lval);
+}
+#endif
+
+static void
+update_bounds(__pmContext *ctxp, double t_req, pmResult *logrp, int do_mark, int *done_prior, int *done_next)
+{
+ /*
+ * for every metric in the result from the log
+ * for every instance in the result from the log
+ * if we have ever asked for this metric and instance, update the
+ * range bounds, if necessary
+ */
+ int k;
+ int i;
+ __pmHashCtl *hcp = &ctxp->c_archctl->ac_pmid_hc;
+ __pmHashNode *hp;
+ pmidcntl_t *pcp;
+ instcntl_t *icp;
+ double t_this;
+ __pmTimeval tmp;
+ int changed;
+
+ tmp.tv_sec = (__int32_t)logrp->timestamp.tv_sec;
+ tmp.tv_usec = (__int32_t)logrp->timestamp.tv_usec;
+ t_this = __pmTimevalSub(&tmp, &ctxp->c_archctl->ac_log->l_label.ill_start);
+
+ if (logrp->numpmid == 0 && do_mark != UPD_MARK_NONE) {
+ /* mark record, discontinuity in log */
+ for (icp = (instcntl_t *)ctxp->c_archctl->ac_want; icp != NULL; icp = icp->want) {
+ if (t_this <= t_req &&
+ (t_this >= icp->t_prior || icp->t_prior > t_req)) {
+ /* <mark> is closer than best lower bound to date */
+ icp->t_prior = t_this;
+ icp->s_prior = S_MARK;
+ if (icp->metric->valfmt != PM_VAL_INSITU) {
+ if (icp->v_prior.pval != NULL)
+ __pmUnpinPDUBuf((void *)icp->v_prior.pval);
+ icp->v_prior.pval = NULL;
+ }
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ char strbuf[20];
+ fprintf(stderr, "pmid %s inst %d <mark> t_prior=%.6f t_first=%.6f t_last=%.6f\n",
+ pmIDStr_r(icp->metric->desc.pmid, strbuf, sizeof(strbuf)), icp->inst, icp->t_prior, icp->t_first, icp->t_last);
+ }
+#endif
+ if (icp->search && done_prior != NULL) {
+ icp->search = 0;
+ (*done_prior)++;
+ }
+ }
+ if (t_this >= t_req &&
+ ((t_this <= icp->t_next || icp->t_next < 0) ||
+ icp->t_next < t_req)) {
+ /* <mark> is closer than best upper bound to date */
+ icp->t_next = t_this;
+ icp->s_next = S_MARK;
+ if (icp->metric->valfmt != PM_VAL_INSITU) {
+ if (icp->v_next.pval != NULL)
+ __pmUnpinPDUBuf((void *)icp->v_next.pval);
+ icp->v_next.pval = NULL;
+ }
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ char strbuf[20];
+ fprintf(stderr, "pmid %s inst %d <mark> t_next=%.6f t_first=%.6f t_last=%.6f\n",
+ pmIDStr_r(icp->metric->desc.pmid, strbuf, sizeof(strbuf)), icp->inst, icp->t_next, icp->t_first, icp->t_last);
+ }
+#endif
+ if (icp->search && done_next != NULL) {
+ icp->search = 0;
+ (*done_next)++;
+ }
+ }
+ }
+ return;
+ }
+
+ changed = 0;
+ for (k = 0; k < logrp->numpmid; k++) {
+ hp = __pmHashSearch((int)logrp->vset[k]->pmid, hcp);
+ if (hp == NULL)
+ continue;
+ pcp = (pmidcntl_t *)hp->data;
+ if (pcp->valfmt == -1 && logrp->vset[k]->numval > 0)
+ pcp->valfmt = logrp->vset[k]->valfmt;
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ for (i = 0; i < logrp->vset[k]->numval; i++) {
+ if (logrp->vset[k]->vlist[i].inst == icp->inst ||
+ icp->inst == PM_IN_NULL) {
+ /* matched on instance */
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_INTERP) && (pmDebug & DBG_TRACE_DESPERATE)) {
+ char strbuf[20];
+ fprintf(stderr, "update: match pmid %s inst %d t_this=%.6f t_prior=%.6f t_next=%.6f t_first=%.6f t_last=%.6f\n",
+ pmIDStr_r(logrp->vset[k]->pmid, strbuf, sizeof(strbuf)), icp->inst,
+ t_this, icp->t_prior, icp->t_next,
+ icp->t_first, icp->t_last);
+ }
+#endif
+ if (t_this <= t_req &&
+ (icp->t_prior > t_req || t_this >= icp->t_prior)) {
+ /*
+ * at or before the requested time, and this is the
+ * closest-to-date lower bound
+ */
+ changed = 1;
+ if (icp->t_prior < icp->t_next && icp->t_prior >= t_req) {
+ /* shuffle prior to next */
+ icp->t_next = icp->t_prior;
+ if (pcp->valfmt == PM_VAL_INSITU)
+ icp->v_next.lval = icp->v_prior.lval;
+ else {
+ if (icp->v_next.pval != NULL)
+ __pmUnpinPDUBuf((void *)icp->v_next.pval);
+ icp->v_next.pval = icp->v_prior.pval;
+ }
+ }
+ icp->t_prior = t_this;
+ icp->s_prior = S_VALUE;
+ if (pcp->valfmt == PM_VAL_INSITU)
+ icp->v_prior.lval = logrp->vset[k]->vlist[i].value.lval;
+ else {
+ if (icp->v_prior.pval != NULL)
+ __pmUnpinPDUBuf((void *)icp->v_prior.pval);
+ icp->v_prior.pval = logrp->vset[k]->vlist[i].value.pval;
+ __pmPinPDUBuf((void *)icp->v_prior.pval);
+ }
+ if (icp->search && done_prior != NULL) {
+ /* one we were looking for */
+ changed |= 2;
+ icp->search = 0;
+ (*done_prior)++;
+ }
+ }
+ if (t_this >= t_req &&
+ (icp->t_next < t_req || t_this <= icp->t_next)) {
+ /*
+ * at or after the requested time, and this is the
+ * closest-to-date upper bound
+ */
+ changed |= 1;
+ if (icp->t_prior < icp->t_next && icp->t_next <= t_req) {
+ /* shuffle next to prior */
+ icp->t_prior = icp->t_next;
+ icp->s_prior = icp->s_next;
+ if (pcp->valfmt == PM_VAL_INSITU)
+ icp->v_prior.lval = icp->v_next.lval;
+ else {
+ if (icp->v_prior.pval != NULL)
+ __pmUnpinPDUBuf((void *)icp->v_prior.pval);
+ icp->v_prior.pval = icp->v_next.pval;
+ }
+ }
+ icp->t_next = t_this;
+ icp->s_next = S_VALUE;
+ if (pcp->valfmt == PM_VAL_INSITU)
+ icp->v_next.lval = logrp->vset[k]->vlist[i].value.lval;
+ else {
+ if (icp->v_next.pval != NULL)
+ __pmUnpinPDUBuf((void *)icp->v_next.pval);
+ icp->v_next.pval = logrp->vset[k]->vlist[i].value.pval;
+ __pmPinPDUBuf((void *)icp->v_next.pval);
+ }
+ if (icp->search && done_next != NULL) {
+ /* one we were looking for */
+ changed |= 2;
+ icp->search = 0;
+ (*done_next)++;
+ }
+ }
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_INTERP) && changed) {
+ char strbuf[20];
+ fprintf(stderr, "update%s pmid %s inst %d prior: t=%.6f",
+ changed & 2 ? "+search" : "",
+ pmIDStr_r(logrp->vset[k]->pmid, strbuf, sizeof(strbuf)), icp->inst, icp->t_prior);
+ dumpval(stderr, pcp->desc.type, icp->metric->valfmt, 1, icp);
+ fprintf(stderr, " next: t=%.6f", icp->t_next);
+ dumpval(stderr, pcp->desc.type, icp->metric->valfmt, 0, icp);
+ fprintf(stderr, " t_first=%.6f t_last=%.6f\n",
+ icp->t_first, icp->t_last);
+ }
+#endif
+ goto next_inst;
+ }
+ }
+next_inst:
+ ;
+ }
+ }
+
+ return;
+}
+
+static void
+do_roll(__pmContext *ctxp, double t_req)
+{
+ pmResult *logrp;
+ __pmTimeval tmp;
+ double t_this;
+
+ /*
+ * now roll forwards in the direction of log reading
+ * to make sure we are up to t_req
+ */
+ if (ctxp->c_delta > 0) {
+ while (cache_read(ctxp->c_archctl, PM_MODE_FORW, &logrp) >= 0) {
+ tmp.tv_sec = (__int32_t)logrp->timestamp.tv_sec;
+ tmp.tv_usec = (__int32_t)logrp->timestamp.tv_usec;
+ t_this = __pmTimevalSub(&tmp, &ctxp->c_archctl->ac_log->l_label.ill_start);
+ if (t_this > t_req)
+ break;
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP)
+ fprintf(stderr, "roll forw to t=%.6f%s\n",
+ t_this, logrp->numpmid == 0 ? " <mark>" : "");
+#endif
+ ctxp->c_archctl->ac_offset = ftell(ctxp->c_archctl->ac_log->l_mfp);
+ assert(ctxp->c_archctl->ac_offset >= 0);
+ ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_log->l_curvol;
+ update_bounds(ctxp, t_req, logrp, UPD_MARK_FORW, NULL, NULL);
+ }
+ }
+ else {
+ while (cache_read(ctxp->c_archctl, PM_MODE_BACK, &logrp) >= 0) {
+ tmp.tv_sec = (__int32_t)logrp->timestamp.tv_sec;
+ tmp.tv_usec = (__int32_t)logrp->timestamp.tv_usec;
+ t_this = __pmTimevalSub(&tmp, &ctxp->c_archctl->ac_log->l_label.ill_start);
+ if (t_this < t_req)
+ break;
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP)
+ fprintf(stderr, "roll back to t=%.6f%s\n",
+ t_this, logrp->numpmid == 0 ? " <mark>" : "");
+#endif
+ ctxp->c_archctl->ac_offset = ftell(ctxp->c_archctl->ac_log->l_mfp);
+ assert(ctxp->c_archctl->ac_offset >= 0);
+ ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_log->l_curvol;
+ update_bounds(ctxp, t_req, logrp, UPD_MARK_BACK, NULL, NULL);
+ }
+ }
+}
+
+#define pmXTBdeltaToTimeval(d, m, t) { \
+ (t)->tv_sec = 0; \
+ (t)->tv_usec = (long)0; \
+ switch(PM_XTB_GET(m)) { \
+ case PM_TIME_NSEC: (t)->tv_usec = (long)((d) / 1000); break; \
+ case PM_TIME_USEC: (t)->tv_usec = (long)(d); break; \
+ case PM_TIME_MSEC: (t)->tv_sec = (d) / 1000; (t)->tv_usec = (long)(1000 * ((d) % 1000)); break; \
+ case PM_TIME_SEC: (t)->tv_sec = (d); break; \
+ case PM_TIME_MIN: (t)->tv_sec = (d) * 60; break; \
+ case PM_TIME_HOUR: (t)->tv_sec = (d) * 360; break; \
+ default: (t)->tv_sec = (d) / 1000; (t)->tv_usec = (long)(1000 * ((d) % 1000)); break; \
+ } \
+}
+
+int
+__pmLogFetchInterp(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **result)
+{
+ int i;
+ int j;
+ int sts;
+ double t_req;
+ double t_this;
+ pmResult *rp;
+ pmResult *logrp;
+ __pmHashCtl *hcp = &ctxp->c_archctl->ac_pmid_hc;
+ __pmHashNode *hp;
+ pmidcntl_t *pcp = NULL; /* initialize to pander to gcc */
+ instcntl_t *icp;
+ int back = 0;
+ int forw = 0;
+ int done;
+ int done_roll;
+ static int dowrap = -1;
+ __pmTimeval tmp;
+ struct timeval delta_tv;
+
+ PM_INIT_LOCKS();
+ PM_LOCK(__pmLock_libpcp);
+ if (dowrap == -1) {
+ /* PCP_COUNTER_WRAP in environment enables "counter wrap" logic */
+ if (getenv("PCP_COUNTER_WRAP") == NULL)
+ dowrap = 0;
+ else
+ dowrap = 1;
+ }
+ PM_UNLOCK(__pmLock_libpcp);
+
+ t_req = __pmTimevalSub(&ctxp->c_origin, &ctxp->c_archctl->ac_log->l_label.ill_start);
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ fprintf(stderr, "__pmLogFetchInterp @ ");
+ __pmPrintTimeval(stderr, &ctxp->c_origin);
+ fprintf(stderr, " t_req=%.6f curvol=%d posn=%ld (vol=%d) serial=%d\n",
+ t_req, ctxp->c_archctl->ac_log->l_curvol,
+ (long)ctxp->c_archctl->ac_offset, ctxp->c_archctl->ac_vol,
+ ctxp->c_archctl->ac_serial);
+ nr_cache[PM_MODE_FORW] = nr[PM_MODE_FORW] = 0;
+ nr_cache[PM_MODE_BACK] = nr[PM_MODE_BACK] = 0;
+ }
+#endif
+
+ /*
+ * the 0.001 is magic slop for 1 msec, which is about as accurate
+ * as we can expect any of this timing stuff to be ...
+ */
+ if (t_req < -0.001) {
+ sts = PM_ERR_EOL;
+ goto all_done;
+ }
+
+ if (t_req > ctxp->c_archctl->ac_end + 0.001) {
+ struct timeval end;
+ __pmTimeval tmp;
+
+ /*
+ * Past end of archive ... see if it has grown since we last looked.
+ */
+ if (pmGetArchiveEnd(&end) >= 0) {
+ tmp.tv_sec = (__int32_t)end.tv_sec;
+ tmp.tv_usec = (__int32_t)end.tv_usec;
+ ctxp->c_archctl->ac_end = __pmTimevalSub(&tmp, &ctxp->c_archctl->ac_log->l_label.ill_start);
+ }
+ if (t_req > ctxp->c_archctl->ac_end) {
+ sts = PM_ERR_EOL;
+ goto all_done;
+ }
+ }
+
+ if ((rp = (pmResult *)malloc(sizeof(pmResult) + (numpmid - 1) * sizeof(pmValueSet *))) == NULL)
+ return -oserror();
+
+ rp->timestamp.tv_sec = ctxp->c_origin.tv_sec;
+ rp->timestamp.tv_usec = ctxp->c_origin.tv_usec;
+ rp->numpmid = numpmid;
+
+ /* zeroth pass ... clear search and inresult flags */
+ for (j = 0; j < hcp->hsize; j++) {
+ for (hp = hcp->hash[j]; hp != NULL; hp = hp->next) {
+ pcp = (pmidcntl_t *)hp->data;
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ icp->search = icp->inresult = 0;
+ icp->unbound = icp->want = NULL;
+ }
+ }
+ }
+
+ /*
+ * first pass ... scan all metrics, establish which ones are in
+ * the log, and which instances are being requested ... also build
+ * the skeletal pmResult
+ */
+ ctxp->c_archctl->ac_want = NULL;
+ for (j = 0; j < numpmid; j++) {
+ if (pmidlist[j] == PM_ID_NULL)
+ continue;
+ hp = __pmHashSearch((int)pmidlist[j], hcp);
+ if (hp == NULL) {
+ /* first time we've been asked for this one in this context */
+ if ((pcp = (pmidcntl_t *)malloc(sizeof(pmidcntl_t))) == NULL) {
+ __pmNoMem("__pmLogFetchInterp.pmidcntl_t", sizeof(pmidcntl_t), PM_FATAL_ERR);
+ /*NOTREACHED*/
+ }
+ pcp->valfmt = -1;
+ pcp->first = NULL;
+ sts = __pmHashAdd((int)pmidlist[j], (void *)pcp, hcp);
+ if (sts < 0) {
+ rp->numpmid = j;
+ pmFreeResult(rp);
+ free(pcp);
+ return sts;
+ }
+ sts = __pmLogLookupDesc(ctxp->c_archctl->ac_log, pmidlist[j], &pcp->desc);
+ if (sts < 0)
+ /* not in the archive log */
+ pcp->desc.type = -1;
+ else {
+ /* enumerate all the instances from the domain underneath */
+ int *instlist = NULL;
+ char **namelist = NULL;
+ instcntl_t *lcp;
+ if (pcp->desc.indom == PM_INDOM_NULL) {
+ sts = 1;
+ if ((instlist = (int *)malloc(sizeof(int))) == NULL) {
+ __pmNoMem("__pmLogFetchInterp.instlist", sizeof(int), PM_FATAL_ERR);
+ }
+ instlist[0] = PM_IN_NULL;
+ }
+ else {
+ sts = pmGetInDomArchive(pcp->desc.indom, &instlist, &namelist);
+ }
+ lcp = NULL;
+ for (i = 0; i < sts; i++) {
+ if ((icp = (instcntl_t *)malloc(sizeof(instcntl_t))) == NULL) {
+ __pmNoMem("__pmLogFetchInterp.instcntl_t", sizeof(instcntl_t), PM_FATAL_ERR);
+ }
+ if (lcp)
+ lcp->next = icp;
+ else
+ pcp->first = icp;
+ lcp = icp;
+ icp->metric = pcp;
+ icp->inresult = icp->search = 0;
+ icp->next = icp->want = icp->unbound = NULL;
+ icp->inst = instlist[i];
+ icp->t_prior = icp->t_next = icp->t_first = icp->t_last = -1;
+ icp->s_prior = icp->s_next = S_MARK;
+ icp->v_prior.pval = icp->v_next.pval = NULL;
+ }
+ if (instlist != NULL)
+ free(instlist);
+ if (namelist != NULL)
+ free(namelist);
+ }
+ }
+ else
+ /* seen this one before */
+ pcp = (pmidcntl_t *)hp->data;
+
+ pcp->numval = 0;
+ if (pcp->desc.type == -1) {
+ pcp->numval = PM_ERR_PMID_LOG;
+ }
+ else if (pcp->desc.indom != PM_INDOM_NULL) {
+ /* use the profile to filter the instances to be returned */
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ if (__pmInProfile(pcp->desc.indom, ctxp->c_instprof, icp->inst)) {
+ icp->inresult = 1;
+ icp->want = (instcntl_t *)ctxp->c_archctl->ac_want;
+ ctxp->c_archctl->ac_want = icp;
+ pcp->numval++;
+ }
+ else
+ icp->inresult = 0;
+ }
+ }
+ else {
+ pcp->first->inresult = 1;
+ pcp->first->want = (instcntl_t *)ctxp->c_archctl->ac_want;
+ ctxp->c_archctl->ac_want = pcp->first;
+ pcp->numval = 1;
+ }
+ }
+
+ if (ctxp->c_archctl->ac_serial == 0) {
+ /* need gross positioning from temporal index */
+ __pmLogSetTime(ctxp);
+ ctxp->c_archctl->ac_offset = ftell(ctxp->c_archctl->ac_log->l_mfp);
+ assert(ctxp->c_archctl->ac_offset >= 0);
+ ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_log->l_curvol;
+
+ /*
+ * and now fine-tuning ...
+ * back-up (relative to the direction we are reading the log)
+ * to make sure
+ */
+ if (ctxp->c_delta > 0) {
+ while (cache_read(ctxp->c_archctl, PM_MODE_BACK, &logrp) >= 0) {
+ tmp.tv_sec = (__int32_t)logrp->timestamp.tv_sec;
+ tmp.tv_usec = (__int32_t)logrp->timestamp.tv_usec;
+ t_this = __pmTimevalSub(&tmp, &ctxp->c_archctl->ac_log->l_label.ill_start);
+ if (t_this <= t_req) {
+ break;
+ }
+ ctxp->c_archctl->ac_offset = ftell(ctxp->c_archctl->ac_log->l_mfp);
+ assert(ctxp->c_archctl->ac_offset >= 0);
+ ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_log->l_curvol;
+ update_bounds(ctxp, t_req, logrp, UPD_MARK_NONE, NULL, NULL);
+ }
+ }
+ else {
+ while (cache_read(ctxp->c_archctl, PM_MODE_FORW, &logrp) >= 0) {
+ tmp.tv_sec = (__int32_t)logrp->timestamp.tv_sec;
+ tmp.tv_usec = (__int32_t)logrp->timestamp.tv_usec;
+ t_this = __pmTimevalSub(&tmp, &ctxp->c_archctl->ac_log->l_label.ill_start);
+ if (t_this > t_req) {
+ break;
+ }
+ ctxp->c_archctl->ac_offset = ftell(ctxp->c_archctl->ac_log->l_mfp);
+ assert(ctxp->c_archctl->ac_offset >= 0);
+ ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_log->l_curvol;
+ update_bounds(ctxp, t_req, logrp, UPD_MARK_NONE, NULL, NULL);
+ }
+ }
+ ctxp->c_archctl->ac_serial = 1;
+ }
+
+ /* get to the last remembered place */
+ __pmLogChangeVol(ctxp->c_archctl->ac_log, ctxp->c_archctl->ac_vol);
+ fseek(ctxp->c_archctl->ac_log->l_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
+
+ /*
+ * optimization to supress roll forwards unless really needed ...
+ * if the sample interval is much shorter than the time between log
+ * records, then do not roll forwards unless some scanning is
+ * required ... and if scanning is required in the "forwards"
+ * direction, no need to roll forwards
+ */
+ done_roll = 0;
+
+ /*
+ * second pass ... see which metrics are not currently bounded below
+ */
+ ctxp->c_archctl->ac_unbound = NULL;
+ for (j = 0; j < numpmid; j++) {
+ if (pmidlist[j] == PM_ID_NULL)
+ continue;
+ hp = __pmHashSearch((int)pmidlist[j], hcp);
+ assert(hp != NULL);
+ pcp = (pmidcntl_t *)hp->data;
+ if (pcp->numval > 0) {
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ if (!icp->inresult)
+ continue;
+ if (icp->t_first >= 0 && t_req < icp->t_first)
+ /* before earliest, don't bother */
+ continue;
+retry_back:
+ /*
+ * At this stage there _may_ be a value earlier in the
+ * archive of interest ...
+ * t_prior = -1 => have not explored in this direction,
+ * so need to go back
+ * t_prior > t_req => need to push t_prior to be <= t_req
+ * if possible, so go back
+ * t_next is valid and a mark and t_next > t_req => need
+ * to search back also
+ */
+ if (icp->t_prior < 0 || icp->t_prior > t_req ||
+ (icp->t_next >= 0 && icp->s_next == S_MARK && icp->t_next > t_req)) {
+ if (back == 0 && !done_roll) {
+ done_roll = 1;
+ if (ctxp->c_delta > 0) {
+ /* forwards before scanning back */
+ do_roll(ctxp, t_req);
+ goto retry_back;
+ }
+ }
+ back++;
+ icp->search = 1;
+ icp->unbound = (instcntl_t *)ctxp->c_archctl->ac_unbound;
+ ctxp->c_archctl->ac_unbound = icp;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ char strbuf[20];
+ fprintf(stderr, "search back for inst %d and pmid %s (t_first=%.6f t_prior=%.6f%s t_next=%.6f%s t_last=%.6f)\n",
+ icp->inst, pmIDStr_r(pmidlist[j], strbuf, sizeof(strbuf)), icp->t_first,
+ icp->t_prior, statestr[icp->s_prior],
+ icp->t_next, statestr[icp->s_next],
+ icp->t_last);
+ }
+#endif
+ }
+ }
+ }
+ }
+
+ if (back) {
+ /*
+ * at least one metric requires a bound from earlier in the log ...
+ * position ourselves, ... and search
+ */
+ __pmLogChangeVol(ctxp->c_archctl->ac_log, ctxp->c_archctl->ac_vol);
+ fseek(ctxp->c_archctl->ac_log->l_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
+ done = 0;
+
+ while (done < back) {
+ if (cache_read(ctxp->c_archctl, PM_MODE_BACK, &logrp) < 0) {
+ /* ran into start of log */
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ fprintf(stderr, "Start of Log, %d metric-inst not found\n",
+ back - done);
+ }
+#endif
+ break;
+ }
+ tmp.tv_sec = (__int32_t)logrp->timestamp.tv_sec;
+ tmp.tv_usec = (__int32_t)logrp->timestamp.tv_usec;
+ t_this = __pmTimevalSub(&tmp, &ctxp->c_archctl->ac_log->l_label.ill_start);
+ if (ctxp->c_delta < 0 && t_this >= t_req) {
+ /* going backwards, and not up to t_req yet */
+ ctxp->c_archctl->ac_offset = ftell(ctxp->c_archctl->ac_log->l_mfp);
+ assert(ctxp->c_archctl->ac_offset >= 0);
+ ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_log->l_curvol;
+ }
+ update_bounds(ctxp, t_req, logrp, UPD_MARK_BACK, &done, NULL);
+
+ /*
+ * forget about those that can never be found from here
+ * in this direction
+ */
+ for (icp = (instcntl_t *)ctxp->c_archctl->ac_unbound; icp != NULL; icp = icp->unbound) {
+ if (icp->search && t_this <= icp->t_first) {
+ icp->search = 0;
+ done++;
+ }
+ }
+ }
+ /* end of search, trim t_first as required */
+ for (icp = (instcntl_t *)ctxp->c_archctl->ac_unbound; icp != NULL; icp = icp->unbound) {
+ if ((icp->t_prior == -1 || icp->t_prior > t_req) &&
+ icp->t_first < t_req) {
+ icp->t_first = t_req;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ char strbuf[20];
+ fprintf(stderr, "pmid %s inst %d no values before t_first=%.6f\n",
+ pmIDStr_r(icp->metric->desc.pmid, strbuf, sizeof(strbuf)), icp->inst, icp->t_first);
+ }
+#endif
+ }
+ icp->search = 0;
+ }
+ }
+
+ /*
+ * third pass ... see which metrics are not currently bounded above
+ */
+ ctxp->c_archctl->ac_unbound = NULL;
+ for (j = 0; j < numpmid; j++) {
+ if (pmidlist[j] == PM_ID_NULL)
+ continue;
+ hp = __pmHashSearch((int)pmidlist[j], hcp);
+ assert(hp != NULL);
+ pcp = (pmidcntl_t *)hp->data;
+ if (pcp->numval > 0) {
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ if (!icp->inresult)
+ continue;
+ if (icp->t_last >= 0 && t_req > icp->t_last)
+ /* after latest, don't bother */
+ continue;
+retry_forw:
+ /*
+ * At this stage there _may_ be a value later in the
+ * archive of interest ...
+ * t_next = -1 => have not explored in this direction,
+ * so need to go forward
+ * t_next < t_req => need to push t_next to be >= t_req
+ * if possible, so go forward
+ * t_prior is valid and a mark and t_prior < t_req => need
+ * to search forward also
+ */
+ if (icp->t_next < 0 || icp->t_next < t_req ||
+ (icp->t_prior >= 0 && icp->s_prior == S_MARK && icp->t_prior < t_req)) {
+ if (forw == 0 && !done_roll) {
+ done_roll = 1;
+ if (ctxp->c_delta < 0) {
+ /* backwards before scanning forwards */
+ do_roll(ctxp, t_req);
+ goto retry_forw;
+ }
+ }
+ forw++;
+ icp->search = 1;
+ icp->unbound = (instcntl_t *)ctxp->c_archctl->ac_unbound;
+ ctxp->c_archctl->ac_unbound = icp;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ char strbuf[20];
+ fprintf(stderr, "search forw for inst %d and pmid %s (t_first=%.6f t_prior=%.6f%s t_next=%.6f%s t_last=%.6f)\n",
+ icp->inst, pmIDStr_r(pmidlist[j], strbuf, sizeof(strbuf)), icp->t_first,
+ icp->t_prior, statestr[icp->s_prior],
+ icp->t_next, statestr[icp->s_next],
+ icp->t_last);
+ }
+#endif
+ }
+ }
+ }
+ }
+
+ if (forw) {
+ /*
+ * at least one metric requires a bound from later in the log ...
+ * position ourselves ... and search
+ */
+ __pmLogChangeVol(ctxp->c_archctl->ac_log, ctxp->c_archctl->ac_vol);
+ fseek(ctxp->c_archctl->ac_log->l_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
+ done = 0;
+
+ while (done < forw) {
+ if (cache_read(ctxp->c_archctl, PM_MODE_FORW, &logrp) < 0) {
+ /* ran into end of log */
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ fprintf(stderr, "End of Log, %d metric-inst not found\n",
+ forw - done);
+ }
+#endif
+ break;
+ }
+ tmp.tv_sec = (__int32_t)logrp->timestamp.tv_sec;
+ tmp.tv_usec = (__int32_t)logrp->timestamp.tv_usec;
+ t_this = __pmTimevalSub(&tmp, &ctxp->c_archctl->ac_log->l_label.ill_start);
+ if (ctxp->c_delta > 0 && t_this <= t_req) {
+ /* going forwards, and not up to t_req yet */
+ ctxp->c_archctl->ac_offset = ftell(ctxp->c_archctl->ac_log->l_mfp);
+ assert(ctxp->c_archctl->ac_offset >= 0);
+ ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_log->l_curvol;
+ }
+ update_bounds(ctxp, t_req, logrp, UPD_MARK_FORW, NULL, &done);
+
+ /*
+ * forget about those that can never be found from here
+ * in this direction
+ */
+ for (icp = (instcntl_t *)ctxp->c_archctl->ac_unbound; icp != NULL; icp = icp->unbound) {
+ if (icp->search && icp->t_last >= 0 && t_this >= icp->t_last) {
+ icp->search = 0;
+ done++;
+ }
+ }
+ }
+ /* end of search, trim t_last as required */
+ for (icp = (instcntl_t *)ctxp->c_archctl->ac_unbound; icp != NULL; icp = icp->unbound) {
+ if (icp->t_next < t_req &&
+ (icp->t_last < 0 || t_req < icp->t_last)) {
+ icp->t_last = t_req;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ char strbuf[20];
+ fprintf(stderr, "pmid %s inst %d no values after t_last=%.6f\n",
+ pmIDStr_r(icp->metric->desc.pmid, strbuf, sizeof(strbuf)), icp->inst, icp->t_last);
+ }
+#endif
+ }
+ icp->search = 0;
+ }
+ }
+
+ /*
+ * check to see how many qualifying values there are really going to be
+ */
+ for (j = 0; j < numpmid; j++) {
+ if (pmidlist[j] == PM_ID_NULL)
+ continue;
+ hp = __pmHashSearch((int)pmidlist[j], hcp);
+ assert(hp != NULL);
+ pcp = (pmidcntl_t *)hp->data;
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ if (!icp->inresult)
+ continue;
+ if (pcp->desc.sem == PM_SEM_DISCRETE) {
+ if (icp->s_prior == S_MARK || icp->t_prior == -1 ||
+ icp->t_prior > t_req) {
+ /* no earlier value, so no value */
+ pcp->numval--;
+ icp->inresult = 0;
+ }
+ }
+ else {
+ /* assume COUNTER or INSTANT */
+ if (icp->s_prior == S_MARK || icp->t_prior == -1 ||
+ icp->t_prior > t_req ||
+ icp->s_next == S_MARK || icp->t_next == -1 || icp->t_next < t_req) {
+ /* in mid-range, and no bound, so no value */
+ pcp->numval--;
+ icp->inresult = 0;
+ }
+ else if (pcp->desc.sem == PM_SEM_COUNTER) {
+ /*
+ * for counters, has to be arithmetic also,
+ * else cannot interpolate ...
+ */
+ if (pcp->desc.type != PM_TYPE_32 &&
+ pcp->desc.type != PM_TYPE_U32 &&
+ pcp->desc.type != PM_TYPE_64 &&
+ pcp->desc.type != PM_TYPE_U64 &&
+ pcp->desc.type != PM_TYPE_FLOAT &&
+ pcp->desc.type != PM_TYPE_DOUBLE)
+ pcp->numval = PM_ERR_TYPE;
+ }
+ }
+ }
+ }
+
+ for (j = 0; j < numpmid; j++) {
+ if (pmidlist[j] == PM_ID_NULL) {
+ rp->vset[j] = (pmValueSet *)malloc(sizeof(pmValueSet) -
+ sizeof(pmValue));
+ }
+ else {
+ hp = __pmHashSearch((int)pmidlist[j], hcp);
+ assert(hp != NULL);
+ pcp = (pmidcntl_t *)hp->data;
+
+ if (pcp->numval >= 1)
+ rp->vset[j] = (pmValueSet *)malloc(sizeof(pmValueSet) +
+ (pcp->numval - 1)*sizeof(pmValue));
+ else
+ rp->vset[j] = (pmValueSet *)malloc(sizeof(pmValueSet) -
+ sizeof(pmValue));
+ }
+
+ if (rp->vset[j] == NULL) {
+ __pmNoMem("__pmLogFetchInterp.vset", sizeof(pmValueSet), PM_FATAL_ERR);
+ }
+
+ rp->vset[j]->pmid = pmidlist[j];
+ if (pmidlist[j] == PM_ID_NULL) {
+ rp->vset[j]->numval = 0;
+ continue;
+ }
+ rp->vset[j]->numval = pcp->numval;
+ rp->vset[j]->valfmt = pcp->valfmt;
+
+ i = 0;
+ if (pcp->numval > 0) {
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ if (!icp->inresult)
+ continue;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP && done_roll) {
+ char strbuf[20];
+ fprintf(stderr, "pmid %s inst %d prior: t=%.6f",
+ pmIDStr_r(pmidlist[j], strbuf, sizeof(strbuf)), icp->inst, icp->t_prior);
+ dumpval(stderr, pcp->desc.type, icp->metric->valfmt, 1, icp);
+ fprintf(stderr, " next: t=%.6f", icp->t_next);
+ dumpval(stderr, pcp->desc.type, icp->metric->valfmt, 0, icp);
+ fprintf(stderr, " t_first=%.6f t_last=%.6f\n",
+ icp->t_first, icp->t_last);
+ }
+#endif
+ rp->vset[j]->vlist[i].inst = icp->inst;
+ if (pcp->desc.type == PM_TYPE_32 || pcp->desc.type == PM_TYPE_U32) {
+ if (icp->t_prior == t_req)
+ rp->vset[j]->vlist[i++].value.lval = icp->v_prior.lval;
+ else if (icp->t_next == t_req)
+ rp->vset[j]->vlist[i++].value.lval = icp->v_next.lval;
+ else {
+ if (pcp->desc.sem == PM_SEM_DISCRETE) {
+ if (icp->t_prior >= 0)
+ rp->vset[j]->vlist[i++].value.lval = icp->v_prior.lval;
+ }
+ else if (pcp->desc.sem == PM_SEM_INSTANT) {
+ if (icp->t_prior >= 0 && icp->t_next >= 0)
+ rp->vset[j]->vlist[i++].value.lval = icp->v_prior.lval;
+ }
+ else {
+ /* assume COUNTER */
+ if (icp->t_prior >= 0 && icp->t_next >= 0) {
+ if (pcp->desc.type == PM_TYPE_32) {
+ if (icp->v_next.lval >= icp->v_prior.lval ||
+ dowrap == 0) {
+ rp->vset[j]->vlist[i++].value.lval = 0.5 +
+ icp->v_prior.lval + (t_req - icp->t_prior) *
+ (icp->v_next.lval - icp->v_prior.lval) /
+ (icp->t_next - icp->t_prior);
+ }
+ else {
+ /* not monotonic increasing and want wrap */
+ rp->vset[j]->vlist[i++].value.lval = 0.5 +
+ (t_req - icp->t_prior) *
+ (__int32_t)(UINT_MAX - icp->v_prior.lval + 1 + icp->v_next.lval) /
+ (icp->t_next - icp->t_prior);
+ rp->vset[j]->vlist[i].value.lval += icp->v_prior.lval;
+ }
+ }
+ else {
+ pmAtomValue av;
+ pmAtomValue *avp_prior = (pmAtomValue *)&icp->v_prior.lval;
+ pmAtomValue *avp_next = (pmAtomValue *)&icp->v_next.lval;
+ if (avp_next->ul >= avp_prior->ul) {
+ av.ul = 0.5 + avp_prior->ul +
+ (t_req - icp->t_prior) *
+ (avp_next->ul - avp_prior->ul) /
+ (icp->t_next - icp->t_prior);
+ }
+ else {
+ /* not monotonic increasing */
+ if (dowrap) {
+ av.ul = 0.5 +
+ (t_req - icp->t_prior) *
+ (__uint32_t)(UINT_MAX - avp_prior->ul + 1 + avp_next->ul ) /
+ (icp->t_next - icp->t_prior);
+ av.ul += avp_prior->ul;
+ }
+ else {
+ __uint32_t tmp;
+ tmp = avp_prior->ul - avp_next->ul;
+ av.ul = 0.5 + avp_prior->ul -
+ (t_req - icp->t_prior) * tmp /
+ (icp->t_next - icp->t_prior);
+ }
+ }
+ rp->vset[j]->vlist[i++].value.lval = av.ul;
+ }
+ }
+ }
+ }
+ }
+ else if (pcp->desc.type == PM_TYPE_FLOAT && icp->metric->valfmt == PM_VAL_INSITU) {
+ /* OLD style FLOAT insitu */
+ if (icp->t_prior == t_req)
+ rp->vset[j]->vlist[i++].value.lval = icp->v_prior.lval;
+ else if (icp->t_next == t_req)
+ rp->vset[j]->vlist[i++].value.lval = icp->v_next.lval;
+ else {
+ if (pcp->desc.sem == PM_SEM_DISCRETE) {
+ if (icp->t_prior >= 0)
+ rp->vset[j]->vlist[i++].value.lval = icp->v_prior.lval;
+ }
+ else if (pcp->desc.sem == PM_SEM_INSTANT) {
+ if (icp->t_prior >= 0 && icp->t_next >= 0)
+ rp->vset[j]->vlist[i++].value.lval = icp->v_prior.lval;
+ }
+ else {
+ /* assume COUNTER */
+ pmAtomValue av;
+ pmAtomValue *avp_prior = (pmAtomValue *)&icp->v_prior.lval;
+ pmAtomValue *avp_next = (pmAtomValue *)&icp->v_next.lval;
+ if (icp->t_prior >= 0 && icp->t_next >= 0) {
+ av.f = avp_prior->f + (t_req - icp->t_prior) *
+ (avp_next->f - avp_prior->f) /
+ (icp->t_next - icp->t_prior);
+ /* yes this IS correct ... */
+ rp->vset[j]->vlist[i++].value.lval = av.l;
+ }
+ }
+ }
+ }
+ else if (pcp->desc.type == PM_TYPE_FLOAT) {
+ /* NEW style FLOAT in pmValueBlock */
+ int need;
+ pmValueBlock *vp;
+ int ok = 1;
+
+ need = PM_VAL_HDR_SIZE + sizeof(float);
+ if ((vp = (pmValueBlock *)malloc(need)) == NULL) {
+ sts = -oserror();
+ goto bad_alloc;
+ }
+ vp->vlen = need;
+ vp->vtype = PM_TYPE_FLOAT;
+ rp->vset[j]->valfmt = PM_VAL_DPTR;
+ rp->vset[j]->vlist[i++].value.pval = vp;
+ if (icp->t_prior == t_req)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(float));
+ else if (icp->t_next == t_req)
+ memcpy((void *)vp->vbuf, (void *)icp->v_next.pval->vbuf, sizeof(float));
+ else {
+ if (pcp->desc.sem == PM_SEM_DISCRETE) {
+ if (icp->t_prior >= 0)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(float));
+ else
+ ok = 0;
+ }
+ else if (pcp->desc.sem == PM_SEM_INSTANT) {
+ if (icp->t_prior >= 0 && icp->t_next >= 0)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(float));
+ else
+ ok = 0;
+ }
+ else {
+ /* assume COUNTER */
+ if (icp->t_prior >= 0 && icp->t_next >= 0) {
+ pmAtomValue av;
+ void *avp_prior = icp->v_prior.pval->vbuf;
+ void *avp_next = icp->v_next.pval->vbuf;
+ float f_prior;
+ float f_next;
+
+ memcpy((void *)&av.f, avp_prior, sizeof(av.f));
+ f_prior = av.f;
+ memcpy((void *)&av.f, avp_next, sizeof(av.f));
+ f_next = av.f;
+
+ av.f = f_prior + (t_req - icp->t_prior) *
+ (f_next - f_prior) /
+ (icp->t_next - icp->t_prior);
+ memcpy((void *)vp->vbuf, (void *)&av.f, sizeof(av.f));
+ }
+ else
+ ok = 0;
+ }
+ }
+ if (!ok) {
+ i--;
+ free(vp);
+ }
+ }
+ else if (pcp->desc.type == PM_TYPE_64 || pcp->desc.type == PM_TYPE_U64) {
+ int need;
+ pmValueBlock *vp;
+ int ok = 1;
+
+ need = PM_VAL_HDR_SIZE + sizeof(__int64_t);
+ if ((vp = (pmValueBlock *)malloc(need)) == NULL) {
+ sts = -oserror();
+ goto bad_alloc;
+ }
+ vp->vlen = need;
+ if (pcp->desc.type == PM_TYPE_64)
+ vp->vtype = PM_TYPE_64;
+ else
+ vp->vtype = PM_TYPE_U64;
+ rp->vset[j]->valfmt = PM_VAL_DPTR;
+ rp->vset[j]->vlist[i++].value.pval = vp;
+ if (icp->t_prior == t_req)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(__int64_t));
+ else if (icp->t_next == t_req)
+ memcpy((void *)vp->vbuf, (void *)icp->v_next.pval->vbuf, sizeof(__int64_t));
+ else {
+ if (pcp->desc.sem == PM_SEM_DISCRETE) {
+ if (icp->t_prior >= 0)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(__int64_t));
+ else
+ ok = 0;
+ }
+ else if (pcp->desc.sem == PM_SEM_INSTANT) {
+ if (icp->t_prior >= 0 && icp->t_next >= 0)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(__int64_t));
+ else
+ ok = 0;
+ }
+ else {
+ /* assume COUNTER */
+ if (icp->t_prior >= 0 && icp->t_next >= 0) {
+ pmAtomValue av;
+ void *avp_prior = (void *)icp->v_prior.pval->vbuf;
+ void *avp_next = (void *)icp->v_next.pval->vbuf;
+ if (pcp->desc.type == PM_TYPE_64) {
+ __int64_t ll_prior;
+ __int64_t ll_next;
+ memcpy((void *)&av.ll, avp_prior, sizeof(av.ll));
+ ll_prior = av.ll;
+ memcpy((void *)&av.ll, avp_next, sizeof(av.ll));
+ ll_next = av.ll;
+ if (ll_next >= ll_prior || dowrap == 0)
+ av.ll = ll_next - ll_prior;
+ else
+ /* not monotonic increasing and want wrap */
+ av.ll = (__int64_t)(ULONGLONG_MAX - ll_prior + 1 + ll_next);
+ av.ll = (__int64_t)(0.5 + (double)ll_prior +
+ (t_req - icp->t_prior) * (double)av.ll / (icp->t_next - icp->t_prior));
+ memcpy((void *)vp->vbuf, (void *)&av.ll, sizeof(av.ll));
+ }
+ else {
+ __int64_t ull_prior;
+ __int64_t ull_next;
+ memcpy((void *)&av.ull, avp_prior, sizeof(av.ull));
+ ull_prior = av.ull;
+ memcpy((void *)&av.ull, avp_next, sizeof(av.ull));
+ ull_next = av.ull;
+ if (ull_next >= ull_prior) {
+ av.ull = ull_next - ull_prior;
+#if !defined(HAVE_CAST_U64_DOUBLE)
+ {
+ double tmp;
+
+ if (SIGN_64_MASK & av.ull)
+ tmp = (double)(__int64_t)(av.ull & (~SIGN_64_MASK)) + (__uint64_t)SIGN_64_MASK;
+ else
+ tmp = (double)(__int64_t)av.ull;
+
+ av.ull = (__uint64_t)(0.5 + (double)ull_prior +
+ (t_req - icp->t_prior) * tmp /
+ (icp->t_next - icp->t_prior));
+ }
+#else
+ av.ull = (__uint64_t)(0.5 + (double)ull_prior +
+ (t_req - icp->t_prior) * (double)av.ull /
+ (icp->t_next - icp->t_prior));
+#endif
+ }
+ else {
+ /* not monotonic increasing */
+ if (dowrap) {
+ av.ull = ULONGLONG_MAX - ull_prior + 1 +
+ ull_next;
+#if !defined(HAVE_CAST_U64_DOUBLE)
+ {
+ double tmp;
+
+ if (SIGN_64_MASK & av.ull)
+ tmp = (double)(__int64_t)(av.ull & (~SIGN_64_MASK)) + (__uint64_t)SIGN_64_MASK;
+ else
+ tmp = (double)(__int64_t)av.ull;
+
+ av.ull = (__uint64_t)(0.5 + (double)ull_prior +
+ (t_req - icp->t_prior) * tmp /
+ (icp->t_next - icp->t_prior));
+ }
+#else
+ av.ull = (__uint64_t)(0.5 + (double)ull_prior +
+ (t_req - icp->t_prior) * (double)av.ull /
+ (icp->t_next - icp->t_prior));
+#endif
+ }
+ else {
+ __uint64_t tmp;
+ tmp = ull_prior - ull_next;
+#if !defined(HAVE_CAST_U64_DOUBLE)
+ {
+ double xtmp;
+
+ if (SIGN_64_MASK & av.ull)
+ xtmp = (double)(__int64_t)(tmp & (~SIGN_64_MASK)) + (__uint64_t)SIGN_64_MASK;
+ else
+ xtmp = (double)(__int64_t)tmp;
+
+ av.ull = (__uint64_t)(0.5 + (double)ull_prior -
+ (t_req - icp->t_prior) * xtmp /
+ (icp->t_next - icp->t_prior));
+ }
+#else
+ av.ull = (__uint64_t)(0.5 + (double)ull_prior -
+ (t_req - icp->t_prior) * (double)tmp /
+ (icp->t_next - icp->t_prior));
+#endif
+ }
+ }
+ memcpy((void *)vp->vbuf, (void *)&av.ull, sizeof(av.ull));
+ }
+ }
+ else
+ ok = 0;
+ }
+ }
+ if (!ok) {
+ i--;
+ free(vp);
+ }
+ }
+ else if (pcp->desc.type == PM_TYPE_DOUBLE) {
+ int need;
+ pmValueBlock *vp;
+ int ok = 1;
+
+ need = PM_VAL_HDR_SIZE + sizeof(double);
+ if ((vp = (pmValueBlock *)malloc(need)) == NULL) {
+ sts = -oserror();
+ goto bad_alloc;
+ }
+ vp->vlen = need;
+ vp->vtype = PM_TYPE_DOUBLE;
+ rp->vset[j]->valfmt = PM_VAL_DPTR;
+ rp->vset[j]->vlist[i++].value.pval = vp;
+ if (icp->t_prior == t_req)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(double));
+ else if (icp->t_next == t_req)
+ memcpy((void *)vp->vbuf, (void *)icp->v_next.pval->vbuf, sizeof(double));
+ else {
+ if (pcp->desc.sem == PM_SEM_DISCRETE) {
+ if (icp->t_prior >= 0)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(double));
+ else
+ ok = 0;
+ }
+ else if (pcp->desc.sem == PM_SEM_INSTANT) {
+ if (icp->t_prior >= 0 && icp->t_next >= 0)
+ memcpy((void *)vp->vbuf, (void *)icp->v_prior.pval->vbuf, sizeof(double));
+ else
+ ok = 0;
+ }
+ else {
+ /* assume COUNTER */
+ if (icp->t_prior >= 0 && icp->t_next >= 0) {
+ pmAtomValue av;
+ void *avp_prior = (void *)icp->v_prior.pval->vbuf;
+ void *avp_next = (void *)icp->v_next.pval->vbuf;
+ double d_prior;
+ double d_next;
+ memcpy((void *)&av.d, avp_prior, sizeof(av.d));
+ d_prior = av.d;
+ memcpy((void *)&av.d, avp_next, sizeof(av.d));
+ d_next = av.d;
+ av.d = d_prior + (t_req - icp->t_prior) *
+ (d_next - d_prior) /
+ (icp->t_next - icp->t_prior);
+ memcpy((void *)vp->vbuf, (void *)&av.d, sizeof(av.d));
+ }
+ else
+ ok = 0;
+ }
+ }
+ if (!ok) {
+ i--;
+ free(vp);
+ }
+ }
+ else if ((pcp->desc.type == PM_TYPE_AGGREGATE ||
+ pcp->desc.type == PM_TYPE_EVENT ||
+ pcp->desc.type == PM_TYPE_HIGHRES_EVENT ||
+ pcp->desc.type == PM_TYPE_STRING) &&
+ icp->t_prior >= 0) {
+ int need;
+ pmValueBlock *vp;
+
+ need = icp->v_prior.pval->vlen;
+
+ vp = (pmValueBlock *)malloc(need);
+ if (vp == NULL) {
+ sts = -oserror();
+ goto bad_alloc;
+ }
+ rp->vset[j]->valfmt = PM_VAL_DPTR;
+ rp->vset[j]->vlist[i++].value.pval = vp;
+ memcpy((void *)vp, icp->v_prior.pval, need);
+ }
+ else {
+ /* unknown type - skip it, else junk in result */
+ i--;
+ }
+ }
+ }
+ }
+
+ *result = rp;
+ sts = 0;
+
+all_done:
+ pmXTBdeltaToTimeval(ctxp->c_delta, ctxp->c_mode, &delta_tv);
+ ctxp->c_origin.tv_sec += delta_tv.tv_sec;
+ ctxp->c_origin.tv_usec += delta_tv.tv_usec;
+ while (ctxp->c_origin.tv_usec > 1000000) {
+ ctxp->c_origin.tv_sec++;
+ ctxp->c_origin.tv_usec -= 1000000;
+ }
+ while (ctxp->c_origin.tv_usec < 0) {
+ ctxp->c_origin.tv_sec--;
+ ctxp->c_origin.tv_usec += 1000000;
+ }
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_INTERP) {
+ fprintf(stderr, "__pmLogFetchInterp: log reads: forward %ld",
+ nr[PM_MODE_FORW]);
+ if (nr_cache[PM_MODE_FORW])
+ fprintf(stderr, " (+%ld cached)", nr_cache[PM_MODE_FORW]);
+ fprintf(stderr, " backwards %ld",
+ nr[PM_MODE_BACK]);
+ if (nr_cache[PM_MODE_BACK])
+ fprintf(stderr, " (+%ld cached)", nr_cache[PM_MODE_BACK]);
+ fprintf(stderr, "\n");
+ }
+#endif
+
+ return sts;
+
+bad_alloc:
+ /*
+ * leaks a little (vlist[] stuff) ... but does not really matter at
+ * this point, chance of anything good happening from here on are
+ * pretty remote
+ */
+ rp->vset[j]->numval = i;
+ while (++j < numpmid)
+ rp->vset[j]->numval = 0;
+ pmFreeResult(rp);
+
+ return sts;
+
+}
+
+void
+__pmLogResetInterp(__pmContext *ctxp)
+{
+ __pmHashCtl *hcp = &ctxp->c_archctl->ac_pmid_hc;
+ double t_req;
+ __pmHashNode *hp;
+ int k;
+ pmidcntl_t *pcp;
+ instcntl_t *icp;
+
+ if (hcp->hsize == 0)
+ return;
+
+ t_req = __pmTimevalSub(&ctxp->c_origin, &ctxp->c_archctl->ac_log->l_label.ill_start);
+
+ for (k = 0; k < hcp->hsize; k++) {
+ for (hp = hcp->hash[k]; hp != NULL; hp = hp->next) {
+ pcp = (pmidcntl_t *)hp->data;
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ if (icp->t_prior > t_req || icp->t_next < t_req) {
+ icp->t_prior = icp->t_next = -1;
+ if (pcp->valfmt != PM_VAL_INSITU) {
+ if (icp->v_prior.pval != NULL)
+ __pmUnpinPDUBuf((void *)icp->v_prior.pval);
+ if (icp->v_next.pval != NULL)
+ __pmUnpinPDUBuf((void *)icp->v_next.pval);
+ }
+ icp->v_prior.pval = icp->v_next.pval = NULL;
+ }
+ }
+ }
+ }
+}
+
+/*
+ * Free interp data when context is closed ...
+ * - pinned PDU buffers holding values used for interpolation
+ * - hash structures for finding metrics and instances
+ * - read_cache contents
+ *
+ * Called with ctxp->c_lock held.
+ */
+void
+__pmFreeInterpData(__pmContext *ctxp)
+{
+ if (ctxp->c_archctl->ac_pmid_hc.hsize > 0) {
+ /* we have done some interpolation ... */
+ __pmHashCtl *hcp = &ctxp->c_archctl->ac_pmid_hc;
+ __pmHashNode *hp;
+ pmidcntl_t *pcp;
+ instcntl_t *icp;
+ int j;
+
+ for (j = 0; j < hcp->hsize; j++) {
+ __pmHashNode *last_hp = NULL;
+ /*
+ * Don't free __pmHashNode until hp->next has been traversed,
+ * hence free lags one node in the chain (last_hp used for free).
+ * Same for linked list of instcntl_t structs (use last_icp
+ * for free in this case).
+ */
+ for (hp = hcp->hash[j]; hp != NULL; hp = hp->next) {
+ instcntl_t *last_icp = NULL;
+ pcp = (pmidcntl_t *)hp->data;
+ for (icp = pcp->first; icp != NULL; icp = icp->next) {
+ if (pcp->valfmt != PM_VAL_INSITU) {
+ /*
+ * Held values may be in PDU buffers, unpin the PDU
+ * buffers just in case (__pmUnpinPDUBuf is a NOP if
+ * the value is not in a PDU buffer)
+ */
+ if (icp->v_prior.pval != NULL) {
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_INTERP) && (pmDebug & DBG_TRACE_DESPERATE)) {
+ char strbuf[20];
+ fprintf(stderr, "release pmid %s inst %d prior\n",
+ pmIDStr_r(pcp->desc.pmid, strbuf, sizeof(strbuf)), icp->inst);
+ }
+#endif
+ __pmUnpinPDUBuf((void *)icp->v_prior.pval);
+ }
+ if (icp->v_next.pval != NULL) {
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_INTERP) && (pmDebug & DBG_TRACE_DESPERATE)) {
+ char strbuf[20];
+ fprintf(stderr, "release pmid %s inst %d next\n",
+ pmIDStr_r(pcp->desc.pmid, strbuf, sizeof(strbuf)), icp->inst);
+ }
+#endif
+ __pmUnpinPDUBuf((void *)icp->v_next.pval);
+ }
+ }
+ if (last_icp != NULL)
+ free(last_icp);
+ last_icp = icp;
+ }
+ if (last_icp != NULL)
+ free(last_icp);
+ if (last_hp != NULL) {
+ if (last_hp->data != NULL)
+ free(last_hp->data);
+ free(last_hp);
+ }
+ last_hp = hp;
+ }
+ if (last_hp != NULL) {
+ if (last_hp->data != NULL)
+ free(last_hp->data);
+ free(last_hp);
+ }
+ free(hcp->hash);
+ /* just being paranoid here */
+ hcp->hash = NULL;
+ hcp->hsize = 0;
+ }
+ }
+
+ if (ctxp->c_archctl->ac_cache != NULL) {
+ /* read cache allocated, work to be done */
+ cache_t *cache = (cache_t *)ctxp->c_archctl->ac_cache;
+ cache_t *cp;
+
+ for (cp = cache; cp < &cache[NUMCACHE]; cp++) {
+#ifdef PCP_DEBUG
+ if ((pmDebug & DBG_TRACE_LOG) && (pmDebug & DBG_TRACE_INTERP)) {
+ fprintf(stderr, "read cache entry " PRINTF_P_PFX "%p: mfp=" PRINTF_P_PFX "%p rp=" PRINTF_P_PFX "%p\n", cp, cp->mfp, cp->rp);
+ }
+#endif
+ if (cp->mfp == ctxp->c_archctl->ac_log->l_mfp) {
+ if (cp->rp != NULL)
+ pmFreeResult(cp->rp);
+ cp->rp = NULL;
+ cp->mfp = NULL;
+ cp->used = 0;
+ }
+ }
+ }
+}