summaryrefslogtreecommitdiff
path: root/src/libpcp/src/AF.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libpcp/src/AF.c')
-rw-r--r--src/libpcp/src/AF.c511
1 files changed, 511 insertions, 0 deletions
diff --git a/src/libpcp/src/AF.c b/src/libpcp/src/AF.c
new file mode 100644
index 0000000..b2cb34b
--- /dev/null
+++ b/src/libpcp/src/AF.c
@@ -0,0 +1,511 @@
+/*
+ * Copyright (c) 1995-2001,2003 Silicon Graphics, Inc. All Rights Reserved.
+ * Copyright (c) 2009 Aconex. All Rights Reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ */
+
+/*
+ * general purpose asynchronous event management
+ */
+
+#include "pmapi.h"
+#include "impl.h"
+
+#define MIN_ITIMER_USEC 100
+
+typedef struct _qelt {
+ struct _qelt *q_next;
+ int q_afid;
+ struct timeval q_when;
+ struct timeval q_delta;
+ void *q_data;
+ void (*q_func)(int afid, void *data);
+} qelt;
+
+static qelt *root;
+static int afid = 0x8000;
+static int block;
+static void onalarm(int);
+
+/*
+ * Platform dependent routines follow, Windows is very different
+ * to POSIX platforms in terms of signals and timers. Note - we
+ * attempted to use CreateTimerQueue API on Windows, but it does
+ * not behave in the way we'd like unfortunately (QA slow_af.c -
+ * shows quite different results & its un-debuggable cos its all
+ * below the Win32 API).
+ */
+#ifdef IS_MINGW
+VOID CALLBACK ontimer(LPVOID arg, DWORD lo, DWORD hi)
+{
+ onalarm(14); /* 14 == POSIX SIGALRM */
+}
+
+static HANDLE afblock; /* mutex protecting callback */
+static HANDLE aftimer; /* equivalent to ITIMER_REAL */
+static int afsetup; /* one-time-setup: done flag */
+
+static void AFsetup(void)
+{
+ PM_LOCK(__pmLock_libpcp);
+ if (afsetup) {
+ PM_UNLOCK(__pmLock_libpcp);
+ return;
+ }
+ afsetup = 1;
+ afblock = CreateMutex(NULL, FALSE, NULL);
+ aftimer = CreateWaitableTimer(NULL, TRUE, NULL);
+ PM_UNLOCK(__pmLock_libpcp);
+}
+static void AFhold(void)
+{
+ AFsetup();
+ WaitForSingleObject(afblock, INFINITE);
+}
+static void AFrelse(void)
+{
+ if (afsetup)
+ ReleaseMutex(afblock);
+}
+static void AFrearm(void)
+{
+ /* do nothing, callback is always "armed" (except when not setup) */
+}
+
+static void AFsetitimer(struct timeval *interval)
+{
+ LARGE_INTEGER duetime;
+ long long inc;
+
+ AFsetup();
+
+ inc = interval->tv_sec * 10000000ULL; /* sec -> 100 nsecs */
+ inc += (interval->tv_usec * 10ULL); /* used -> 100 nsecs */
+ if (inc > 0) /* negative is relative, positive absolute */
+ inc = -inc; /* we will always want this to be relative */
+ duetime.QuadPart = inc;
+ SetWaitableTimer(aftimer, &duetime, 0, ontimer, NULL, FALSE);
+}
+
+#else /* POSIX */
+static void AFsetitimer(struct timeval *interval)
+{
+ struct itimerval val;
+
+ val.it_value = *interval;
+ val.it_interval.tv_sec = val.it_interval.tv_usec = 0;
+ setitimer(ITIMER_REAL, &val, NULL);
+}
+
+#if !defined(HAVE_SIGHOLD)
+static int
+sighold(int sig)
+{
+ sigset_t s;
+
+ sigemptyset(&s);
+ sigaddset(&s, sig);
+ sigprocmask(SIG_BLOCK, &s, NULL);
+
+ return 0;
+}
+#else
+/*
+ * for Linux the prototype is hidden, even though the function is in
+ * libc
+ */
+extern int sighold(int);
+#endif
+
+#if !defined(HAVE_SIGRELSE)
+static int
+sigrelse(int sig)
+{
+ sigset_t s;
+
+ sigemptyset(&s);
+ sigaddset(&s, sig);
+ sigprocmask(SIG_UNBLOCK, &s, NULL);
+ return 0;
+}
+#else
+/*
+ * for Linux the prototype is hidden, even though the function is in
+ * libc
+ */
+extern int sigrelse(int);
+#endif
+
+static void AFhold(void) { sighold(SIGALRM); }
+static void AFrelse(void) { sigrelse(SIGALRM); }
+static void AFrearm(void) { signal(SIGALRM, onalarm); }
+
+#endif /* POSIX */
+
+
+/*
+ * Platform independent code follows
+ */
+
+#ifdef PCP_DEBUG
+static void
+printdelta(FILE *f, struct timeval *tp)
+{
+ struct tm *tmp;
+ time_t tt = (time_t)tp->tv_sec;
+
+ tmp = gmtime(&tt);
+ fprintf(stderr, "%02d:%02d:%02d.%06ld", tmp->tm_hour, tmp->tm_min, tmp->tm_sec, (long)tp->tv_usec);
+}
+#endif
+/*
+ * a := a + b for struct timevals
+ */
+static void
+tadd(struct timeval *a, struct timeval *b)
+{
+ a->tv_usec += b->tv_usec;
+ if (a->tv_usec > 1000000) {
+ a->tv_usec -= 1000000;
+ a->tv_sec++;
+ }
+ a->tv_sec += b->tv_sec;
+}
+
+/*
+ * a := a - b for struct timevals
+ */
+static void
+tsub_real(struct timeval *a, struct timeval *b)
+{
+ a->tv_usec -= b->tv_usec;
+ if (a->tv_usec < 0) {
+ a->tv_usec += 1000000;
+ a->tv_sec--;
+ }
+ a->tv_sec -= b->tv_sec;
+}
+
+/*
+ * a := a - b for struct timevals, but result is never less than zero
+ */
+static void
+tsub(struct timeval *a, struct timeval *b)
+{
+ tsub_real(a, b);
+ if (a->tv_sec < 0) {
+ /* clip negative values at zero */
+ a->tv_sec = 0;
+ a->tv_usec = 0;
+ }
+}
+
+/*
+ * a : b for struct timevals ... <0 for a<b, ==0 for a==b, >0 for a>b
+ */
+static int
+tcmp(struct timeval *a, struct timeval *b)
+{
+ int res;
+ res = (int)(a->tv_sec - b->tv_sec);
+ if (res == 0)
+ res = (int)(a->tv_usec - b->tv_usec);
+ return res;
+}
+
+static void
+enqueue(qelt *qp)
+{
+ qelt *tqp;
+ qelt *priorp;
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_AF) {
+ struct timeval now;
+
+ __pmtimevalNow(&now);
+ __pmPrintStamp(stderr, &now);
+ fprintf(stderr, " AFenqueue " PRINTF_P_PFX "%p(%d, " PRINTF_P_PFX "%p) for ",
+ qp->q_func, qp->q_afid, qp->q_data);
+ __pmPrintStamp(stderr, &qp->q_when);
+ fputc('\n', stderr);
+ }
+#endif
+
+ for (tqp = root, priorp = NULL;
+ tqp != NULL && tcmp(&qp->q_when, &tqp->q_when) >= 0;
+ tqp = tqp->q_next)
+ priorp = tqp;
+
+ if (priorp == NULL) {
+ qp->q_next = root;
+ root = qp;
+ }
+ else {
+ qp->q_next = priorp->q_next;
+ priorp->q_next = qp;
+ }
+}
+
+static void
+onalarm(int dummy)
+{
+ struct timeval now;
+ struct timeval tmp;
+ struct timeval interval;
+ qelt *qp;
+
+ if (!block)
+ AFhold();
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_AF) {
+ __pmtimevalNow(&now);
+ __pmPrintStamp(stderr, &now);
+ fprintf(stderr, " AFonalarm(%d)\n", dummy);
+ }
+#endif
+ if (root != NULL) {
+ /* something to do ... */
+ while (root != NULL) {
+ /* compute difference between scheduled time and now */
+ __pmtimevalNow(&now);
+ tmp = root->q_when;
+ tsub(&tmp, &now);
+ if (tmp.tv_sec == 0 && tmp.tv_usec <= 10000) {
+ /*
+ * within one 10msec tick, the time has passed for this one,
+ * execute the callback and reschedule
+ */
+
+ qp = root;
+ root = root->q_next;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_AF) {
+ __pmPrintStamp(stderr, &now);
+ fprintf(stderr, " AFcallback " PRINTF_P_PFX "%p(%d, " PRINTF_P_PFX "%p)\n",
+ qp->q_func, qp->q_afid, qp->q_data);
+ }
+#endif
+ qp->q_func(qp->q_afid, qp->q_data);
+
+ if (qp->q_delta.tv_sec == 0 && qp->q_delta.tv_usec == 0) {
+ /*
+ * if delta is zero, this is a single-shot event,
+ * so do not reschedule it
+ */
+ free(qp);
+ }
+ else {
+ /*
+ * avoid falling too far behind
+ * if the scheduled time is more than q_delta in the
+ * past we will never catch up ... better to skip some
+ * events to catch up ...
+ *
+ * <------------ next q_when range ----------------->
+ *
+ * cannot catchup | may catchup | on schedule
+ * | |
+ * --------------------|---------------|------------> time
+ * | |
+ * | +-- now
+ * +-- now - q_delta
+ */
+ __pmtimevalNow(&now);
+ for ( ; ; ) {
+ tadd(&qp->q_when, &qp->q_delta);
+ tmp = qp->q_when;
+ tsub_real(&tmp, &now);
+ tadd(&tmp, &qp->q_delta);
+ if (tmp.tv_sec >= 0)
+ break;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_AF) {
+ __pmPrintStamp(stderr, &now);
+ fprintf(stderr, " AFcallback event %d too slow, skip callback for ", qp->q_afid);
+ __pmPrintStamp(stderr, &qp->q_when);
+ fputc('\n', stderr);
+ }
+#endif
+ }
+ enqueue(qp);
+ }
+ }
+ else
+ /*
+ * head of the queue (and hence all others) are too far in
+ * the future ... done for this time
+ */
+ break;
+ }
+
+ if (root == NULL) {
+ pmprintf("Warning: AF event queue is empty, nothing more will be scheduled\n");
+ pmflush();
+ }
+ else {
+ /* set itimer for head of queue */
+ interval = root->q_when;
+ __pmtimevalNow(&now);
+ tsub(&interval, &now);
+ if (interval.tv_sec == 0 && interval.tv_usec < MIN_ITIMER_USEC)
+ /* use minimal delay (platform dependent) */
+ interval.tv_usec = MIN_ITIMER_USEC;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_AF) {
+ __pmPrintStamp(stderr, &now);
+ fprintf(stderr, " AFsetitimer for delta ");
+ printdelta(stderr, &interval);
+ fputc('\n', stderr);
+ }
+#endif
+ AFsetitimer(&interval);
+ }
+ }
+ if (!block) {
+ AFrearm();
+ AFrelse();
+ }
+}
+
+int
+__pmAFregister(const struct timeval *delta, void *data, void (*func)(int, void *))
+{
+ qelt *qp;
+ struct timeval now;
+ struct timeval interval;
+
+ if (PM_MULTIPLE_THREADS(PM_SCOPE_AF))
+ return PM_ERR_THREAD;
+
+ if (!block)
+ AFhold();
+ if (afid == 0x8000 && !block) /* first time */
+ AFrearm();
+ if ((qp = (qelt *)malloc(sizeof(qelt))) == NULL) {
+ return -oserror();
+ }
+ qp->q_afid = ++afid;
+ qp->q_data = data;
+ qp->q_delta = *delta;
+ qp->q_func = func;
+ __pmtimevalNow(&qp->q_when);
+ tadd(&qp->q_when, &qp->q_delta);
+
+ enqueue(qp);
+ if (root == qp) {
+ /* we ended up at the head of the list, set itimer */
+ interval = qp->q_when;
+ __pmtimevalNow(&now);
+ tsub(&interval, &now);
+
+ if (interval.tv_sec == 0 && interval.tv_usec < MIN_ITIMER_USEC)
+ /* use minimal delay (platform dependent) */
+ interval.tv_usec = MIN_ITIMER_USEC;
+
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_AF) {
+ __pmPrintStamp(stderr, &now);
+ fprintf(stderr, " AFsetitimer for delta ");
+ printdelta(stderr, &interval);
+ fputc('\n', stderr);
+ }
+#endif
+ AFsetitimer(&interval);
+ }
+
+ if (!block)
+ AFrelse();
+ return qp->q_afid;
+}
+
+int
+__pmAFunregister(int afid)
+{
+ qelt *qp;
+ qelt *priorp;
+ struct timeval now;
+ struct timeval interval;
+
+ if (PM_MULTIPLE_THREADS(PM_SCOPE_AF))
+ return PM_ERR_THREAD;
+
+ if (!block)
+ AFhold();
+ for (qp = root, priorp = NULL; qp != NULL && qp->q_afid != afid; qp = qp->q_next)
+ priorp = qp;
+
+ if (qp == NULL) {
+ if (!block)
+ AFrelse();
+ return -1;
+ }
+
+ if (priorp == NULL) {
+ root = qp->q_next;
+ if (root != NULL) {
+ /*
+ * we removed the head of the queue, set itimer for the
+ * new head of queue
+ */
+ interval = root->q_when;
+ __pmtimevalNow(&now);
+ tsub(&interval, &now);
+ if (interval.tv_sec == 0 && interval.tv_usec < MIN_ITIMER_USEC)
+ /* use minimal delay (platform dependent) */
+ interval.tv_usec = MIN_ITIMER_USEC;
+#ifdef PCP_DEBUG
+ if (pmDebug & DBG_TRACE_AF) {
+ __pmPrintStamp(stderr, &now);
+ fprintf(stderr, " AFsetitimer for delta ");
+ printdelta(stderr, &interval);
+ fputc('\n', stderr);
+ }
+#endif
+ AFsetitimer(&interval);
+ }
+ }
+ else
+ priorp->q_next = qp->q_next;
+
+ free(qp);
+
+ if (!block)
+ AFrelse();
+ return 0;
+}
+
+void
+__pmAFblock(void)
+{
+ if (PM_MULTIPLE_THREADS(PM_SCOPE_AF))
+ return;
+ block = 1;
+ AFhold();
+}
+
+void
+__pmAFunblock(void)
+{
+ if (PM_MULTIPLE_THREADS(PM_SCOPE_AF))
+ return;
+ block = 0;
+ AFrearm();
+ AFrelse();
+}
+
+int
+__pmAFisempty(void)
+{
+ return (root==NULL);
+}