diff options
Diffstat (limited to 'src/perl/PMDA/local.c')
-rw-r--r-- | src/perl/PMDA/local.c | 468 |
1 files changed, 468 insertions, 0 deletions
diff --git a/src/perl/PMDA/local.c b/src/perl/PMDA/local.c new file mode 100644 index 0000000..03a67fa --- /dev/null +++ b/src/perl/PMDA/local.c @@ -0,0 +1,468 @@ +/* + * Copyright (c) 2012-2014 Red Hat. + * Copyright (c) 2008-2011 Aconex. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "local.h" +#include <dirent.h> +#include <search.h> +#include <sys/stat.h> +#ifdef HAVE_PWD_H +#include <pwd.h> +#endif + +static timers_t *timers; +static int ntimers; +static files_t *files; +static int nfiles; + +char * +local_strdup_suffix(const char *string, const char *suffix) +{ + size_t length = strlen(string) + strlen(suffix) + 1; + char *result = malloc(length); + + if (!result) + return result; + sprintf(result, "%s%s", string, suffix); + return result; +} + +char * +local_strdup_prefix(const char *prefix, const char *string) +{ + size_t length = strlen(prefix) + strlen(string) + 1; + char *result = malloc(length); + + if (!result) + return result; + sprintf(result, "%s%s", prefix, string); + return result; +} + +int +local_timer(double timeout, scalar_t *callback, int cookie) +{ + int size = sizeof(*timers) * (ntimers + 1); + delta_t delta; + + delta.tv_sec = (time_t)timeout; + delta.tv_usec = (long)((timeout - (double)delta.tv_sec) * 1000000.0); + + if ((timers = realloc(timers, size)) == NULL) + __pmNoMem("timers resize", size, PM_FATAL_ERR); + timers[ntimers].id = -1; /* not yet registered */ + timers[ntimers].delta = delta; + timers[ntimers].cookie = cookie; + timers[ntimers].callback = callback; + return ntimers++; +} + +int +local_timer_get_cookie(int id) +{ + int i; + + for (i = 0; i < ntimers; i++) + if (timers[i].id == id) + return timers[i].cookie; + return -1; +} + +scalar_t * +local_timer_get_callback(int id) +{ + int i; + + for (i = 0; i < ntimers; i++) + if (timers[i].id == id) + return timers[i].callback; + return NULL; +} + +static int +local_file(int type, int fd, scalar_t *callback, int cookie) +{ + int size = sizeof(*files) * (nfiles + 1); + + if ((files = realloc(files, size)) == NULL) + __pmNoMem("files resize", size, PM_FATAL_ERR); + files[nfiles].type = type; + files[nfiles].fd = fd; + files[nfiles].cookie = cookie; + files[nfiles].callback = callback; + return nfiles++; +} + +int +local_pipe(char *pipe, scalar_t *callback, int cookie) +{ + FILE *fp = popen(pipe, "r"); + int me; + +#if defined(HAVE_SIGPIPE) + signal(SIGPIPE, SIG_IGN); +#endif + if (!fp) { + __pmNotifyErr(LOG_ERR, "popen failed (%s): %s", pipe, osstrerror()); + exit(1); + } + me = local_file(FILE_PIPE, fileno(fp), callback, cookie); + files[me].me.pipe.file = fp; + return fileno(fp); +} + +int +local_tail(char *file, scalar_t *callback, int cookie) +{ + int fd = open(file, O_RDONLY | O_NDELAY); + struct stat stats; + int me; + + if (fd < 0) { + __pmNotifyErr(LOG_ERR, "open failed (%s): %s", file, osstrerror()); + exit(1); + } + if (fstat(fd, &stats) < 0) { + __pmNotifyErr(LOG_ERR, "fstat failed (%s): %s", file, osstrerror()); + exit(1); + } + lseek(fd, 0L, SEEK_END); + me = local_file(FILE_TAIL, fd, callback, cookie); + files[me].me.tail.path = strdup(file); + files[me].me.tail.dev = stats.st_dev; + files[me].me.tail.ino = stats.st_ino; + return me; +} + +int +local_sock(char *host, int port, scalar_t *callback, int cookie) +{ + __pmSockAddr *myaddr; + __pmHostEnt *servinfo = NULL; + void *enumIx; + int sts = -1; + int me, fd = -1; + + if ((servinfo = __pmGetAddrInfo(host)) == NULL) { + __pmNotifyErr(LOG_ERR, "__pmGetAddrInfo (%s): %s", host, netstrerror()); + goto error; + } + /* Loop over the addresses resolved for this host name until one of them + connects. */ + enumIx = NULL; + for (myaddr = __pmHostEntGetSockAddr(servinfo, &enumIx); + myaddr != NULL; + myaddr = __pmHostEntGetSockAddr(servinfo, &enumIx)) { + if (__pmSockAddrIsInet(myaddr)) + fd = __pmCreateSocket(); + else if (__pmSockAddrIsIPv6(myaddr)) + fd = __pmCreateIPv6Socket(); + else { + __pmNotifyErr(LOG_ERR, "invalid address family: %d\n", + __pmSockAddrGetFamily(myaddr)); + fd = -1; + } + + if (fd < 0) { + __pmSockAddrFree(myaddr); + continue; /* Try the next address */ + } + + __pmSockAddrSetPort(myaddr, port); + + sts = __pmConnect(fd, (void *)myaddr, __pmSockAddrSize()); + __pmSockAddrFree(myaddr); + if (sts == 0) + break; /* Successful connection */ + + /* Try the next address */ + __pmCloseSocket(fd); + fd = -1; + } + __pmHostEntFree(servinfo); + + if (sts < 0) { + __pmNotifyErr(LOG_ERR, "__pmConnect (%s): %s", host, netstrerror()); + goto error; + } + + me = local_file(FILE_SOCK, fd, callback, cookie); + files[me].me.sock.host = strdup(host); + files[me].me.sock.port = port; + + return me; + + error: + if (fd >= 0) + __pmCloseSocket(fd); + if (servinfo) + __pmHostEntFree(servinfo); + exit(1); +} + +static char * +local_filetype(int type) +{ + if (type == FILE_SOCK) + return "socket connection"; + if (type == FILE_PIPE) + return "command pipe"; + if (type == FILE_TAIL) + return "tailed file"; + return NULL; +} + +int +local_files_get_descriptor(int id) +{ + if (id < 0 || id >= nfiles) + return -1; + return files[id].fd; +} + +void +local_atexit(void) +{ + while (ntimers > 0) { + --ntimers; + __pmAFunregister(timers[ntimers].id); + } + if (timers) { + free(timers); + timers = NULL; + } + while (nfiles > 0) { + --nfiles; + if (files[nfiles].type == FILE_PIPE) + pclose(files[nfiles].me.pipe.file); + if (files[nfiles].type == FILE_TAIL) { + close(files[nfiles].fd); + if (files[nfiles].me.tail.path) + free(files[nfiles].me.tail.path); + files[nfiles].me.tail.path = NULL; + } + if (files[nfiles].type == FILE_SOCK) { + __pmCloseSocket(files[nfiles].fd); + if (files[nfiles].me.sock.host) + free(files[nfiles].me.sock.host); + files[nfiles].me.sock.host = NULL; + } + } + if (files) { + free(files); + files = NULL; + } + /* take out any children we created */ +#ifdef HAVE_SIGNAL + signal(SIGTERM, SIG_IGN); +#endif + __pmProcessTerminate((pid_t)0, 0); +} + +static void +local_log_rotated(files_t *file) +{ + struct stat stats; + + if (stat(file->me.tail.path, &stats) < 0) + return; + if (stats.st_ino == file->me.tail.ino && stats.st_dev == file->me.tail.dev) + return; + + close(file->fd); + file->fd = open(file->me.tail.path, O_RDONLY | O_NDELAY); + if (file->fd < 0) { + __pmNotifyErr(LOG_ERR, "open failed after log rotate (%s): %s", + file->me.tail.path, osstrerror()); + return; + } + files->me.tail.dev = stats.st_dev; + files->me.tail.ino = stats.st_ino; +} + +static void +local_reconnector(files_t *file) +{ + __pmSockAddr *myaddr = NULL; + __pmHostEnt *servinfo = NULL; + int fd = -1; + int sts = -1; + void *enumIx; + + if (file->fd >= 0) /* reconnect-needed flag */ + goto done; + if ((servinfo = __pmGetAddrInfo(file->me.sock.host)) == NULL) + goto done; + /* Loop over the addresses resolved for this host name until one of them + connects. */ + enumIx = NULL; + for (myaddr = __pmHostEntGetSockAddr(servinfo, &enumIx); + myaddr != NULL; + myaddr = __pmHostEntGetSockAddr(servinfo, &enumIx)) { + if (__pmSockAddrIsInet(myaddr)) + fd = __pmCreateSocket(); + else if (__pmSockAddrIsIPv6(myaddr)) + fd = __pmCreateIPv6Socket(); + else { + __pmNotifyErr(LOG_ERR, "invalid address family: %d\n", + __pmSockAddrGetFamily(myaddr)); + fd = -1; + } + + if (fd < 0) { + __pmSockAddrFree(myaddr); + continue; /* Try the next address */ + } + + __pmSockAddrSetPort(myaddr, files->me.sock.port); + sts = __pmConnect(fd, (void *)myaddr, __pmSockAddrSize()); + __pmSockAddrFree(myaddr); + if (sts == 0) /* good connection */ + break; + + /* Try the next address */ + __pmCloseSocket(fd); + fd = -1; + } + + if (fd >= 0) + files->fd = fd; + +done: + if (myaddr) + __pmSockAddrFree(myaddr); + if (servinfo) + __pmHostEntFree(servinfo); +} + +static void +local_connection(files_t *file) +{ + if (file->type == FILE_TAIL) + local_log_rotated(file); + else if (file->type == FILE_TAIL) + local_reconnector(file); +} + +void +local_pmdaMain(pmdaInterface *self) +{ + static char buffer[4096]; + int pmcdfd, nready, nfds, i, j, count, fd, maxfd = -1; + __pmFdSet fds, readyfds; + ssize_t bytes; + size_t offset; + char *s, *p; + + if ((pmcdfd = __pmdaInFd(self)) < 0) + exit(1); + + for (i = 0; i < ntimers; i++) + timers[i].id = __pmAFregister(&timers[i].delta, &timers[i].cookie, + timer_callback); + + /* custom PMDA main loop */ + for (count = 0; ; count++) { + struct timeval timeout = { 1, 0 }; + + __pmFD_ZERO(&fds); + __pmFD_SET(pmcdfd, &fds); + for (i = 0; i < nfiles; i++) { + if (files[i].type == FILE_TAIL) + continue; + fd = files[i].fd; + __pmFD_SET(fd, &fds); + if (fd > maxfd) + maxfd = fd; + } + nfds = ((pmcdfd > maxfd) ? pmcdfd : maxfd) + 1; + + __pmFD_COPY(&readyfds, &fds); + nready = __pmSelectRead(nfds, &readyfds, &timeout); + if (nready < 0) { + if (neterror() != EINTR) { + __pmNotifyErr(LOG_ERR, "select failed: %s\n", + netstrerror()); + exit(1); + } + continue; + } + + __pmAFblock(); + + if (__pmFD_ISSET(pmcdfd, &readyfds)) { + if (__pmdaMainPDU(self) < 0) { + __pmAFunblock(); + exit(1); + } + } + + for (i = 0; i < nfiles; i++) { + fd = files[i].fd; + /* check for log rotation or host reconnection needed */ + if ((count % 10) == 0) /* but only once every 10 */ + local_connection(&files[i]); + if (files[i].type != FILE_TAIL && !(__pmFD_ISSET(fd, &readyfds))) + continue; + offset = 0; +multiread: + bytes = __pmRead(fd, buffer + offset, sizeof(buffer)-1 - offset); + if (bytes < 0) { + if ((files[i].type == FILE_TAIL) && + (oserror() == EINTR) || + (oserror() == EAGAIN) || + (oserror() == EWOULDBLOCK)) + continue; + if (files[i].type == FILE_SOCK) { + close(files[i].fd); + files[i].fd = -1; + continue; + } + __pmNotifyErr(LOG_ERR, "Data read error on %s: %s\n", + local_filetype(files[i].type), osstrerror()); + exit(1); + } + if (bytes == 0) { + if (files[i].type == FILE_TAIL) + continue; + __pmNotifyErr(LOG_ERR, "No data to read - %s may be closed\n", + local_filetype(files[i].type)); + exit(1); + } + buffer[sizeof(buffer)-1] = '\0'; + for (s = p = buffer, j = 0; + *s != '\0' && j < sizeof(buffer)-1; + s++, j++) { + if (*s != '\n') + continue; + *s = '\0'; + /*__pmNotifyErr(LOG_INFO, "Input callback: %s\n", p);*/ + input_callback(files[i].callback, files[i].cookie, p); + p = s + 1; + } + if (files[i].type == FILE_TAIL) { + /* did we just do a full buffer read? */ + if (p == buffer) { + __pmNotifyErr(LOG_ERR, "Ignoring long line: \"%s\"\n", p); + } else if (j == sizeof(buffer) - 1) { + offset = sizeof(buffer)-1 - (p - buffer); + memmove(buffer, p, offset); + goto multiread; /* read rest of line */ + } + } + } + + __pmAFunblock(); + } +} |