summaryrefslogtreecommitdiff
path: root/src/pmmgr/pmmgr.cxx
diff options
context:
space:
mode:
authorIgor Pashev <pashev.igor@gmail.com>2014-10-26 12:33:50 +0400
committerIgor Pashev <pashev.igor@gmail.com>2014-10-26 12:33:50 +0400
commit47e6e7c84f008a53061e661f31ae96629bc694ef (patch)
tree648a07f3b5b9d67ce19b0fd72e8caa1175c98f1a /src/pmmgr/pmmgr.cxx
downloadpcp-debian.tar.gz
Debian 3.9.10debian/3.9.10debian
Diffstat (limited to 'src/pmmgr/pmmgr.cxx')
-rw-r--r--src/pmmgr/pmmgr.cxx1285
1 files changed, 1285 insertions, 0 deletions
diff --git a/src/pmmgr/pmmgr.cxx b/src/pmmgr/pmmgr.cxx
new file mode 100644
index 0000000..5ff210f
--- /dev/null
+++ b/src/pmmgr/pmmgr.cxx
@@ -0,0 +1,1285 @@
+/*
+ * Copyright (c) 2013-2014 Red Hat.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#ifndef _XOPEN_SOURCE
+#define _XOPEN_SOURCE 600
+#endif
+#include "pmmgr.h"
+#include "impl.h"
+
+#include <sys/stat.h>
+#include <cassert>
+#include <cstdlib>
+#include <fstream>
+#include <iostream>
+
+extern "C" {
+#include <fcntl.h>
+#include <unistd.h>
+#include <glob.h>
+#include <sys/wait.h>
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
+#ifdef IS_LINUX
+#include <sys/syscall.h>
+#endif
+}
+
+
+using namespace std;
+
+// ------------------------------------------------------------------------
+
+
+int quit;
+int polltime = 60;
+
+
+// ------------------------------------------------------------------------
+
+
+// Create a string that is safe to pass to system(3), i.e., sh -c,
+// by quoting metacharacters. This transform generally should be
+// applied only once.
+string
+sh_quote(const string& input)
+{
+ string output;
+ for (unsigned i=0; i<input.length(); i++)
+ {
+ char c = input[i];
+ if ((ispunct(c) || isspace(c)) && // quite aggressive
+ (c != ':' && c != '.' && c != '_' && c != '/' && c != '-')) // safe & popular punctuation
+ output += '\\';
+ output += c;
+ }
+
+ return output;
+}
+
+
+// Print a string to cout/cerr progress reports, similar to the
+// stuff produced by __pmNotifyErr
+ostream&
+timestamp(ostream &o)
+{
+ time_t now;
+ time (&now);
+ char *now2 = ctime (&now);
+ if (now2)
+ now2[19] = '\0'; // overwrite \n
+
+ return o << "[" << (now2 ? now2 : "") << "] " << pmProgname << "("
+ << getpid()
+#ifdef HAVE_PTHREAD_H
+#ifdef IS_LINUX
+ << "/" << syscall(SYS_gettid)
+#else
+ << "/" << pthread_self()
+#endif
+#endif
+ << "): ";
+}
+
+
+extern "C" void *
+pmmgr_daemon_poll_thread (void* a)
+{
+ pmmgr_daemon* d = (pmmgr_daemon*) a;
+ d->poll();
+ return 0;
+}
+
+
+// A wrapper for something like system(3), but responding quicker to
+// interrupts and standardizing tracing.
+int
+pmmgr_configurable::wrap_system(const std::string& cmd)
+{
+ if (pmDebug & DBG_TRACE_APPL0)
+ timestamp(cout) << "running " << cmd << endl;
+
+ int pid = fork();
+ if (pid == 0)
+ {
+ // child
+ int rc = execl ("/bin/sh", "sh", "-c", cmd.c_str(), NULL);
+ timestamp(cerr) << "failed to execl sh -c " << cmd << " rc=" << rc << endl;
+ _exit (1);
+ }
+ else if (pid < 0)
+ {
+ // error
+ timestamp(cerr) << "fork for " << cmd << " failed: errno=" << errno << endl;
+ return -1;
+ }
+ else
+ {
+ // parent
+ int status = -1;
+ int rc;
+ //timestamp(cout) << "waiting for pid=" << pid << endl;
+
+ do { rc = waitpid(pid, &status, 0); } while (!quit && rc == -1 && errno == EINTR); // TEMP_FAILURE_RETRY
+ if (quit)
+ {
+ // timestamp(cout) << "killing pid=" << pid << endl;
+ kill (pid, SIGTERM); // just to be on the safe side
+ // it might linger a few seconds in zombie mode
+ }
+
+ //timestamp(cout) << "done status=" << status << endl;
+ if (status != 0)
+ timestamp(cerr) << "system(" << cmd << ") failed: rc=" << status << endl;
+ return status;
+ }
+}
+
+
+
+// ------------------------------------------------------------------------
+
+
+pmmgr_configurable::pmmgr_configurable(const string& dir):
+ config_directory(dir)
+{
+}
+
+
+vector<string>
+pmmgr_configurable::get_config_multi(const string& file) const
+{
+ vector<string> lines;
+
+ string complete_filename = config_directory + (char)__pmPathSeparator() + file;
+ ifstream f (complete_filename.c_str());
+ while (f.good()) {
+ string line;
+ getline(f, line);
+ if (! f.good())
+ break;
+ if (line != "")
+ lines.push_back(line);
+ }
+
+ return lines;
+}
+
+
+bool
+pmmgr_configurable::get_config_exists(const string& file) const
+{
+ string complete_filename = config_directory + (char)__pmPathSeparator() + file;
+ ifstream f (complete_filename.c_str());
+ return (f.good());
+}
+
+
+string
+pmmgr_configurable::get_config_single(const string& file) const
+{
+ vector<string> lines = get_config_multi (file);
+ if (lines.size() == 1)
+ return lines[0];
+ else
+ return "";
+}
+
+ostream&
+pmmgr_configurable::timestamp(ostream& o)
+{
+ return ::timestamp(o) << config_directory << ": ";
+}
+
+
+
+// ------------------------------------------------------------------------
+
+
+pmMetricSpec*
+pmmgr_job_spec::parse_metric_spec (const string& spec)
+{
+ if (parsed_metric_cache.find(spec) != parsed_metric_cache.end())
+ return parsed_metric_cache[spec];
+
+ const char* specstr = spec.c_str();
+ pmMetricSpec* pms = 0;
+ char *errmsg;
+ char dummy_host[] = "";
+ int rc = pmParseMetricSpec (specstr,
+ 0, dummy_host, /* both ignored */
+ & pms, & errmsg);
+ if (rc < 0) {
+ timestamp(cerr) << "hostid-metrics '" << specstr << "' parse error: " << errmsg << endl;
+ free (errmsg);
+ }
+
+ parsed_metric_cache[spec] = pms;
+ return pms;
+}
+
+
+pmmgr_hostid
+pmmgr_job_spec::compute_hostid (const pcp_context_spec& ctx)
+{
+ int pmc = pmNewContext (PM_CONTEXT_HOST, ctx.c_str());
+ if (pmc < 0)
+ return "";
+
+ // parse all the hostid metric specifications
+ vector<string> hostid_specs = get_config_multi("hostid-metrics");
+ if (hostid_specs.size() == 0)
+ hostid_specs.push_back(string("pmcd.hostname"));
+
+ // fetch all hostid metrics in sequence
+ vector<string> hostid_fields;
+ for (unsigned i=0; i<hostid_specs.size(); i++)
+ {
+ pmMetricSpec* pms = parse_metric_spec (hostid_specs[i]);
+
+ pmID pmid;
+ int rc = pmLookupName (1, & pms->metric, &pmid);
+ if (rc < 0)
+ continue;
+
+ pmDesc desc;
+ rc = pmLookupDesc (pmid, & desc);
+ if (rc < 0)
+ continue;
+
+ if (desc.type != PM_TYPE_STRING)
+ continue;
+
+ if ((desc.indom != PM_INDOM_NULL) && pms->ninst > 0)
+ {
+ // reset the indom to include all elements
+ rc = pmDelProfile(desc.indom, 0, (int *)0);
+ if (rc < 0)
+ continue;
+
+ int *inums = (int *) malloc (pms->ninst * sizeof(int));
+ if (inums == NULL)
+ continue;
+ // NB: after this point, 'continue' must also free(inums);
+
+ // map the instance names to instance numbers
+ unsigned numinums_used = 0;
+ for (int j=0; j<pms->ninst; j++)
+ {
+ int inum = pmLookupInDom (desc.indom, pms->inst[j]);
+ if (inum < 0)
+ continue;
+ inums[numinums_used++] = inum;
+ }
+
+ // add the selected instances to the profile
+ rc = pmAddProfile (desc.indom, numinums_used, inums);
+ free (inums);
+ if (rc < 0)
+ continue;
+ }
+
+ // fetch the values
+ pmResult *r;
+ rc = pmFetch (1, &pmid, &r);
+ if (rc < 0)
+ continue;
+ // NB: after this point, 'continue' must also pmFreeResult(r)
+
+ // in-place sort value list by indom number
+ pmSortInstances(r);
+
+ // only vset[0] will be set, for csb->pmid
+ if (r->vset[0]->numval > 0)
+ {
+ for (int j=0; j<r->vset[0]->numval; j++) // iterate over instances
+ {
+ // fetch the string value
+ pmAtomValue av;
+ rc = pmExtractValue(r->vset[0]->valfmt,
+ & r->vset[0]->vlist[j],
+ PM_TYPE_STRING, & av, PM_TYPE_STRING);
+ if (rc < 0)
+ continue;
+
+ // at last! we have a string we can accumulate
+ hostid_fields.push_back (av.cp);
+ free (av.cp);
+ }
+ }
+
+ (void) pmFreeResult (r);
+ }
+
+ (void) pmDestroyContext (pmc);
+
+ // Sanitize the host-id metric values into a single string that is
+ // suitable for posix-portable-filenames, and not too ugly for
+ // someone to look at or type in.
+ //
+ // http://www.opengroup.org/onlinepubs/007904975/basedefs/xbd_chap03.html
+ string sanitized;
+ for (unsigned i=0; i<hostid_fields.size(); i++)
+ {
+ const string& f = hostid_fields[i];
+ if (f == "") continue;
+ if (sanitized != "") sanitized += "-"; // separate fields
+ for (unsigned j=0; j<f.length(); j++)
+ {
+ char c = f[j];
+ if (isalnum(c))
+ sanitized += c;
+ else if (c== '-' || c == '.' || c == '_')
+ sanitized += c;
+ else
+ // drop other non-portable characters NB: this can mean
+ // unintentional duplication in IDs, which a user can work
+ // around by configuring additional hostid metrics.
+ ;
+ }
+ }
+
+ return pmmgr_hostid (sanitized);
+}
+
+
+pmmgr_job_spec::pmmgr_job_spec(const std::string& config_directory):
+ pmmgr_configurable(config_directory)
+{
+ // We don't actually have to do any configuration parsing at this
+ // time. Let's do it during poll(), which makes us more responsive
+ // to run-time changes.
+}
+
+
+pmmgr_job_spec::~pmmgr_job_spec()
+{
+ // free any cached pmMetricSpec's
+ for (map<string,pmMetricSpec*>::iterator it = parsed_metric_cache.begin();
+ it != parsed_metric_cache.end();
+ ++it)
+ free (it->second); // aka pmFreeMetricSpec
+
+ // kill all our daemons created during poll()
+ for (map<pmmgr_hostid,pcp_context_spec>::iterator it = known_targets.begin();
+ it != known_targets.end();
+ ++it)
+ note_dead_hostid (it->first);
+}
+
+
+// ------------------------------------------------------------------------
+
+
+void
+pmmgr_job_spec::poll()
+{
+ if (quit) return;
+
+ // phase 1: run all discovery/probing functions to collect context-spec's
+ set<pcp_context_spec> new_specs;
+
+ vector<string> target_hosts = get_config_multi("target-host");
+ for (unsigned i=0; i<target_hosts.size(); i++)
+ new_specs.insert(target_hosts[i]);
+
+ vector<string> target_discovery = get_config_multi("target-discovery");
+ for (unsigned i=0; i<target_discovery.size() && !quit; i++)
+ {
+ char **urls = NULL;
+ const char *discovery = (target_discovery[i] == "")
+ ? NULL
+ : target_discovery[i].c_str();
+ int numUrls = pmDiscoverServices (PM_SERVER_SERVICE_SPEC, discovery, &urls);
+ if (numUrls <= 0)
+ continue;
+ for (int i=0; i<numUrls; i++)
+ new_specs.insert(string(urls[i]));
+ free ((void*) urls);
+ }
+
+ // fallback to logging the local server, if nothing else is configured/discovered
+ if (target_hosts.size() == 0 &&
+ target_discovery.size() == 0)
+ new_specs.insert("local:");
+
+ // phase 2: move previously-identified targets over, so we can tell who
+ // has come or gone
+ const map<pmmgr_hostid,pcp_context_spec> old_known_targets = known_targets;
+ known_targets.clear();
+
+ // phase 3: map the context-specs to hostids to find new hosts
+ map<pmmgr_hostid,double> known_target_scores;
+ for (set<pcp_context_spec>::iterator it = new_specs.begin();
+ it != new_specs.end() && !quit;
+ ++it)
+ {
+ struct timeval before, after;
+ __pmtimevalNow(& before);
+ pmmgr_hostid hostid = compute_hostid (*it);
+ __pmtimevalNow(& after);
+ double score = __pmtimevalSub(& after, & before); // the smaller, the preferreder
+
+ if (hostid != "") // verified existence/liveness
+ {
+ if (pmDebug & DBG_TRACE_APPL0)
+ timestamp(cout) << "hostid " << hostid << " via " << *it << " time " << score << endl;
+
+ if (known_target_scores.find(hostid) == known_target_scores.end() ||
+ known_target_scores[hostid] > score) // previous slower than this one
+ {
+ known_targets[hostid] = *it;
+ known_target_scores[hostid] = score;
+ }
+ }
+ }
+
+ // phase 4a: compare old_known_targets vs. known_targets: look for any recently died
+ for (map<pmmgr_hostid,pcp_context_spec>::const_iterator it = old_known_targets.begin();
+ it != old_known_targets.end();
+ ++it)
+ {
+ const pmmgr_hostid& hostid = it->first;
+ if (known_targets.find(hostid) == known_targets.end())
+ note_dead_hostid (hostid);
+ }
+
+ // phase 4b: compare new known_targets & old_known_targets: look for recently born
+ for (map<pmmgr_hostid,pcp_context_spec>::const_iterator it = known_targets.begin();
+ it != known_targets.end();
+ ++it)
+ {
+ const pmmgr_hostid& hostid = it->first;
+ if (old_known_targets.find(hostid) == old_known_targets.end())
+ note_new_hostid (hostid, known_targets[hostid]);
+ }
+
+ // phase 5: poll all the live daemons
+ // NB: there is a parallelism opportunity, as running many pmlogconf/etc.'s in series
+ // is a possible bottleneck.
+#ifdef HAVE_PTHREAD_H
+ vector<pthread_t> threads;
+#endif
+ for (multimap<pmmgr_hostid,pmmgr_daemon*>::iterator it = daemons.begin();
+ it != daemons.end() && !quit;
+ ++it)
+ {
+#ifdef HAVE_PTHREAD_H
+ pthread_t foo;
+ int rc = pthread_create(&foo, NULL, &pmmgr_daemon_poll_thread, it->second);
+ if (rc == 0)
+ threads.push_back (foo);
+#else
+ int rc = -ENOSUPP;
+#endif
+ if (rc) // threading failed or running single-threaded
+ it->second->poll();
+ }
+
+#ifdef HAVE_PTHREAD_H
+ for (unsigned i=0; i<threads.size(); i++)
+ pthread_join (threads[i], NULL);
+#endif
+
+ // phase 6: garbage-collect ancient log-directory subdirs
+ string subdir_gc = get_config_single("log-subdirectory-gc");
+ if (subdir_gc == "")
+ subdir_gc = "90days";
+ struct timeval tv;
+ char *errmsg;
+ int rc = pmParseInterval(subdir_gc.c_str(), & tv, & errmsg);
+ if (rc < 0)
+ {
+ timestamp(cerr) << "log-subdirectory-gc '" << subdir_gc << "' parse error: " << errmsg << endl;
+ free (errmsg);
+ // default to 90days in another way
+ tv.tv_sec = 60 * 60 * 24 * 90;
+ tv.tv_usec = 0;
+ }
+ time_t now;
+ (void) time(& now);
+
+ // NB: check less frequently?
+
+ // XXX: getting a bit duplicative
+ string default_log_dir =
+ string(pmGetConfig("PCP_LOG_DIR")) + (char)__pmPathSeparator() + "pmmgr";
+ string log_dir = get_config_single ("log-directory");
+ if (log_dir == "") log_dir = default_log_dir;
+ else if(log_dir[0] != '/') log_dir = config_directory + (char)__pmPathSeparator() + log_dir;
+
+ glob_t the_blob;
+ string glob_pattern = log_dir + (char)__pmPathSeparator() + "*";
+ rc = glob (glob_pattern.c_str(),
+ GLOB_NOESCAPE
+#ifdef GLOB_ONLYDIR
+ | GLOB_ONLYDIR
+#endif
+ , NULL, & the_blob);
+ if (rc == 0)
+ {
+ for (unsigned i=0; i<the_blob.gl_pathc && !quit; i++)
+ {
+ string item_name = the_blob.gl_pathv[i];
+
+ // Reject if currently live hostid
+ // NB: basename(3) might modify the argument string, so we don't feed
+ // it item_name.c_str().
+ string target_name = basename(the_blob.gl_pathv[i]);
+ if (known_targets.find(target_name) != known_targets.end())
+ continue;
+
+ struct stat foo;
+ rc = stat (item_name.c_str(), & foo);
+ if (rc == 0 &&
+ S_ISDIR(foo.st_mode) &&
+ (foo.st_mtime + tv.tv_sec) < now)
+ {
+ // <Janine Melnitz>We've got one!!!!!</>
+ timestamp(cout) << "gc subdirectory " << item_name << endl;
+ string cleanup_cmd = "/bin/rm -rf " + sh_quote(item_name);
+ (void) wrap_system(cleanup_cmd);
+ }
+ }
+ }
+ globfree (& the_blob);
+}
+
+
+// ------------------------------------------------------------------------
+
+
+void
+pmmgr_job_spec::note_new_hostid(const pmmgr_hostid& hid, const pcp_context_spec& spec)
+{
+ timestamp(cout) << "new hostid " << hid << " at " << string(spec) << endl;
+
+ if (get_config_exists("pmlogger"))
+ daemons.insert(make_pair(hid, new pmmgr_pmlogger_daemon(config_directory, hid, spec)));
+
+ if (get_config_exists("pmie"))
+ daemons.insert(make_pair(hid, new pmmgr_pmie_daemon(config_directory, hid, spec)));
+}
+
+
+void
+pmmgr_job_spec::note_dead_hostid(const pmmgr_hostid& hid)
+{
+ timestamp(cout) << "dead hostid " << hid << endl;
+
+ pair<multimap<pmmgr_hostid,pmmgr_daemon*>::iterator,
+ multimap<pmmgr_hostid,pmmgr_daemon*>::iterator> range =
+ daemons.equal_range(hid);
+
+ for (multimap<pmmgr_hostid,pmmgr_daemon*>::iterator it = range.first;
+ it != range.second;
+ ++it)
+ delete (it->second);
+
+ daemons.erase(range.first, range.second);
+}
+
+
+// ------------------------------------------------------------------------
+
+
+pmmgr_daemon::pmmgr_daemon(const std::string& config_directory,
+ const pmmgr_hostid& hostid,
+ const pcp_context_spec& spec):
+ pmmgr_configurable(config_directory),
+ hostid(hostid),
+ spec(spec),
+ pid(0),
+ last_restart_attempt(0)
+{
+}
+
+
+pmmgr_pmlogger_daemon::pmmgr_pmlogger_daemon(const std::string& config_directory,
+ const pmmgr_hostid& hostid,
+ const pcp_context_spec& spec):
+ pmmgr_daemon(config_directory, hostid, spec)
+{
+}
+
+
+pmmgr_pmie_daemon::pmmgr_pmie_daemon(const std::string& config_directory,
+ const pmmgr_hostid& hostid,
+ const pcp_context_spec& spec):
+ pmmgr_daemon(config_directory, hostid, spec)
+{
+}
+
+
+pmmgr_daemon::~pmmgr_daemon()
+{
+ if (pid != 0)
+ {
+ int ignored;
+ (void) kill ((pid_t) pid, SIGTERM);
+ (void) waitpid ((pid_t) pid, &ignored, 0); // collect zombie
+ if (pmDebug & DBG_TRACE_APPL0)
+ timestamp(cout) << "daemon pid " << pid << " killed" << endl;
+ }
+}
+
+
+void pmmgr_daemon::poll()
+{
+ if (quit) return;
+
+ if (pid != 0) // test if it's still alive
+ {
+ // reap it if it might have died
+ int ignored;
+ int rc = waitpid ((pid_t) pid, &ignored, WNOHANG);
+
+ rc = kill ((pid_t) pid, 0);
+ if (rc < 0)
+ {
+ if (pmDebug & DBG_TRACE_APPL0)
+ timestamp(cout) << "daemon pid " << pid << " found dead" << endl;
+ pid = 0;
+ // we will try again immediately
+ }
+ }
+
+ if (pid == 0) // needs a restart
+ {
+ time_t now;
+ time (& now);
+
+ // Prevent an error in the environment or the pmmgr daemon
+ // command lines from generating a tight loop of failure /
+ // retry, wasting time and log file space. Limit retry attempts
+ // to one per poll interval (pmmgr -p N parameter).
+ if (last_restart_attempt && (last_restart_attempt + polltime) >= now)
+ return; // quietly, without attempting to restart
+
+ string commandline = daemon_command_line(); // <--- may take many seconds!
+
+ // NB: Note this time as a restart attempt, even if daemon_command_line()
+ // returned an empty string, so that we don't try to restart it too soon.
+ // We note this time rather than the beginning of daemon_command_line(),
+ // to ensure at least polltime seconds of rest between attempts.
+ last_restart_attempt = now;
+
+ if (quit) return; // without starting the daemon process
+
+ if (commandline == "") // error in some intermediate processing stage
+ {
+ timestamp(cerr) << "failed to prepare daemon command line" << endl;
+ return;
+ }
+
+ // We are going to run the daemon with sh -c, but on some versions of
+ // sh, this doesn't imply an exec, which interferes with signalling.
+ // Enforce exec on even these shells.
+ commandline = string("exec ") + commandline;
+
+ if (pmDebug & DBG_TRACE_APPL0)
+ timestamp(cout) << "fork/exec sh -c " << commandline << endl;
+ pid = fork();
+ if (pid == 0) // child process
+ {
+ int rc = execl ("/bin/sh", "sh", "-c", commandline.c_str(), NULL);
+ timestamp(cerr) << "failed to execl sh -c " << commandline << " rc=" << rc << endl;
+ _exit (1);
+ // parent will try again at next poll
+ }
+ else if (pid < 0) // failed fork
+ {
+ timestamp(cerr) << "failed to fork for sh -c " << commandline << endl;
+ pid = 0;
+ // we will try again at next poll
+ }
+ else // congratulations! we're apparently a parent
+ {
+ if (pmDebug & DBG_TRACE_APPL0)
+ timestamp(cout) << "daemon pid " << pid << " started: " << commandline << endl;
+ }
+ }
+}
+
+
+std::string
+pmmgr_pmlogger_daemon::daemon_command_line()
+{
+ string default_log_dir =
+ string(pmGetConfig("PCP_LOG_DIR")) + (char)__pmPathSeparator() + "pmmgr";
+ string log_dir = get_config_single ("log-directory");
+ if (log_dir == "") log_dir = default_log_dir;
+ else if(log_dir[0] != '/') log_dir = config_directory + (char)__pmPathSeparator() + log_dir;
+
+ (void) mkdir2 (log_dir.c_str(), 0777); // implicitly consults umask(2)
+
+ string host_log_dir = log_dir + (char)__pmPathSeparator() + hostid;
+ (void) mkdir2 (host_log_dir.c_str(), 0777);
+ // (errors creating actual files under host_log_dir will be noted shortly)
+
+ string pmlogger_command =
+ string(pmGetConfig("PCP_BIN_DIR")) + (char)__pmPathSeparator() + "pmlogger";
+ string pmlogger_options = sh_quote(pmlogger_command);
+ pmlogger_options += " " + get_config_single ("pmlogger") + " ";
+
+ // run pmlogconf if requested
+ if (get_config_exists("pmlogconf"))
+ {
+ string pmlogconf_output_file = host_log_dir + (char)__pmPathSeparator() + "config.pmlogger";
+ (void) unlink (pmlogconf_output_file.c_str());
+ string pmlogconf_command =
+ string(pmGetConfig("PCP_BINADM_DIR")) + (char)__pmPathSeparator() + "pmlogconf";
+ string pmlogconf_options =
+ sh_quote(pmlogconf_command)
+ + " -c -r -h " + sh_quote(spec)
+ + " " + get_config_single ("pmlogconf")
+ + " " + sh_quote(pmlogconf_output_file)
+ + " >/dev/null"; // pmlogconf is too chatty
+
+ int rc = wrap_system(pmlogconf_options);
+ if (rc) return "";
+
+ pmlogger_options += " -c " + sh_quote(pmlogconf_output_file);
+ }
+
+ // collect -h direction
+ pmlogger_options += " -h " + sh_quote(spec);
+
+ // hard-code -r to report metrics & expected disk usage rate
+ pmlogger_options += " -r";
+
+ // collect subsidiary pmlogger diagnostics
+ pmlogger_options += " -l " + sh_quote(host_log_dir + (char)__pmPathSeparator() + "pmlogger.log");
+
+ // do log merging
+ if (get_config_exists ("pmlogmerge"))
+ {
+ string pmlogextract_command =
+ string(pmGetConfig("PCP_BIN_DIR")) + (char)__pmPathSeparator() + "pmlogextract";
+
+ string pmlogcheck_command =
+ string(pmGetConfig("PCP_BIN_DIR")) + (char)__pmPathSeparator() + "pmlogcheck";
+
+ string pmlogrewrite_command =
+ string(pmGetConfig("PCP_BINADM_DIR")) + (char)__pmPathSeparator() + "pmlogrewrite";
+
+ string pmlogextract_options = sh_quote(pmlogextract_command);
+
+ string retention = get_config_single ("pmlogmerge-retain");
+ if (retention == "") retention = "14days";
+ struct timeval retention_tv;
+ char *errmsg;
+ int rc = pmParseInterval(retention.c_str(), &retention_tv, &errmsg);
+ if (rc)
+ {
+ timestamp(cerr) << "pmlogmerge-retain '" << retention << "' parse error: " << errmsg << endl;
+ free (errmsg);
+ retention = "14days";
+ retention_tv.tv_sec = 14*24*60*60;
+ retention_tv.tv_usec = 0;
+ }
+ pmlogextract_options += " -S -" + sh_quote(retention);
+
+ // Arrange our new pmlogger to kill itself after the given
+ // period, to give us a chance to rerun.
+ string period = get_config_single ("pmlogmerge");
+ if (period == "") period = "24hours";
+ struct timeval period_tv;
+ rc = pmParseInterval(period.c_str(), &period_tv, &errmsg);
+ if (rc)
+ {
+ timestamp(cerr) << "pmlogmerge '" << period << "' parse error: " << errmsg << endl;
+ free (errmsg);
+ period = "24hours";
+ period_tv.tv_sec = 24*60*60;
+ period_tv.tv_usec = 0;
+ }
+ if (get_config_exists ("pmlogmerge-granular"))
+ {
+ // adjust stopping time to the next multiple of period
+ struct timeval now_tv;
+ __pmtimevalNow (&now_tv);
+ time_t period_s = period_tv.tv_sec;
+ if (period_s < 1) period_s = 1; // at least one second
+ time_t period_end = ((now_tv.tv_sec + period_s - 1) / period_s) * period_s;
+ period = string(" @") +
+ string(ctime(& period_end)).substr(0,24); // 24: ctime(3) magic value, sans \n
+ }
+ pmlogger_options += " -y -T " + sh_quote(period); // NB: pmmgr host local time!
+
+ // Find prior archives by globbing for *.index files,
+ // just like pmlogger_merge does.
+ // Er ... but aren't .index files optional?
+ vector<string> mergeable_archives; // those to merge
+ glob_t the_blob;
+ string glob_pattern = host_log_dir + (char)__pmPathSeparator() + "*.index";
+ rc = glob (glob_pattern.c_str(), GLOB_NOESCAPE, NULL, & the_blob);
+ if (rc == 0)
+ {
+ // compute appropriate
+ struct timeval now_tv;
+ __pmtimevalNow (&now_tv);
+ time_t period_s = period_tv.tv_sec;
+ if (period_s < 1) period_s = 1; // at least one second
+ time_t prior_period_start = ((now_tv.tv_sec - period_s) / period_s) * period_s;
+ time_t prior_period_end = prior_period_start + period_s;
+
+ for (unsigned i=0; i<the_blob.gl_pathc; i++)
+ {
+ if (quit) return "";
+
+ string index_name = the_blob.gl_pathv[i];
+ string base_name = index_name.substr(0,index_name.length()-6); // trim .index
+
+ // Manage retention based upon the stat timestamps of the .index file,
+ // because the archives might be so corrupt that even loglabel-based
+ // checks could fail. Non-corrupt archives will have already been merged
+ // into a fresher archive.
+ struct stat foo;
+ rc = stat (the_blob.gl_pathv[i], & foo);
+ if (rc)
+ {
+ // this apprx. can't happen
+ timestamp(cerr) << "stat '" << the_blob.gl_pathv[i] << "' error; skipping cleanup" << endl;
+ continue; // likely nothing can be done to this one
+ }
+ else if ((foo.st_mtime + retention_tv.tv_sec) < now_tv.tv_sec)
+ {
+ string bnq = sh_quote(base_name);
+ string cleanup_cmd = string("/bin/rm -f")
+ + " " + bnq + ".[0-9]*"
+ + " " + bnq + ".index" +
+ + " " + bnq + ".meta";
+
+ (void) wrap_system(cleanup_cmd);
+ continue; // it's gone now; don't try to merge it or anything
+ }
+
+ if (quit) return "";
+
+ // sic pmlogcheck on it; if it is broken, pmlogextract
+ // will give up and make no progress
+ string pmlogcheck_options = sh_quote(pmlogcheck_command);
+ pmlogcheck_options += " " + sh_quote(base_name) + " >/dev/null";
+
+ rc = wrap_system(pmlogcheck_options);
+ if (rc != 0)
+ {
+ timestamp(cerr) << "corrupt archive " << base_name << " preserved." << endl;
+ continue;
+ }
+
+ if (quit) return "";
+
+ // In granular mode, skip if this file is too old or too new. NB: Decide
+ // based upon the log-label, not fstat timestamps, since files postdate
+ // the time region they cover.
+ if (get_config_exists ("pmlogmerge-granular"))
+ {
+ // One could do this the pmloglabel(1) __pmLog* way,
+ // rather than the pmlogsummary(1) PMAPI way.
+
+ int ctx = pmNewContext(PM_CONTEXT_ARCHIVE, base_name.c_str());
+ if (ctx < 0)
+ continue; // skip; gc later
+
+ pmLogLabel label;
+ rc = pmGetArchiveLabel (& label);
+ if (rc < 0)
+ continue; // skip; gc later
+
+ if (label.ll_start.tv_sec >= prior_period_end) // archive too new?
+ {
+ if (pmDebug & DBG_TRACE_APPL0)
+ timestamp(cout) << "skipping merge of too-new archive " << base_name << endl;
+ pmDestroyContext (ctx);
+ continue;
+ }
+
+ struct timeval archive_end;
+ rc = pmGetArchiveEnd(&archive_end);
+ if (rc < 0)
+ {
+ pmDestroyContext (ctx);
+ continue; // skip; gc later
+ }
+
+ if (archive_end.tv_sec < prior_period_start) // archive too old?
+ {
+ if (pmDebug & DBG_TRACE_APPL0)
+ timestamp(cout) << "skipping merge of too-old archive " << base_name << endl;
+ pmDestroyContext (ctx);
+ continue; // skip; gc later
+ }
+
+ pmDestroyContext (ctx);
+ // fallthrough: the archive intersects the prior_period_{start,end} interval
+
+ // XXX: What happens for archives that span across granular periods?
+ }
+
+ mergeable_archives.push_back (base_name);
+ }
+ globfree (& the_blob);
+ }
+
+ string timestr = "archive";
+ time_t now2 = time(NULL);
+ struct tm *now = gmtime(& now2);
+ if (now != NULL)
+ {
+ char timestr2[100];
+ int rc = strftime(timestr2, sizeof(timestr2), "-%Y%m%d.%H%M%S", now);
+ if (rc > 0)
+ timestr += timestr2;
+ }
+ string merged_archive_name = host_log_dir + (char)__pmPathSeparator() + timestr;
+
+ if (mergeable_archives.size() > 1) // 1 or 0 are not worth merging!
+ {
+ // assemble final bits of pmlogextract command line: the inputs and the output
+ for (unsigned i=0; i<mergeable_archives.size(); i++)
+ {
+ if (quit) return "";
+
+ if (get_config_exists("pmlogmerge-rewrite"))
+ {
+ string pmlogrewrite_options = sh_quote(pmlogrewrite_command);
+ pmlogrewrite_options += " -i " + get_config_single("pmlogmerge-rewrite");
+ pmlogrewrite_options += " " + sh_quote(mergeable_archives[i]);
+
+ (void) wrap_system(pmlogrewrite_options.c_str());
+ // In case of error, don't break; let's try to merge it anyway.
+ // Maybe pmlogrewrite will succeed and will get rid of this file.
+ }
+
+ pmlogextract_options += " " + sh_quote(mergeable_archives[i]);
+ }
+
+ if (quit) return "";
+
+ pmlogextract_options += " " + sh_quote(merged_archive_name);
+
+ rc = wrap_system(pmlogextract_options.c_str());
+ if (rc == 0)
+ {
+ // zap the previous archive files
+ //
+ // Don't skip this upon "if (quit)", since the new merged archive is already complete;
+ // it'd be a waste to keep these files around for a future re-merge.
+ for (unsigned i=0; i<mergeable_archives.size(); i++)
+ {
+ string base_name = sh_quote(mergeable_archives[i]);
+ string cleanup_cmd = string("/bin/rm -f")
+ + " " + base_name + ".[0-9]*"
+ + " " + base_name + ".index" +
+ + " " + base_name + ".meta";
+
+ (void) wrap_system(cleanup_cmd.c_str());
+ }
+ }
+ }
+ }
+
+ // synthesize a logfile name similarly as pmlogger_check, but add %S (seconds)
+ // to reduce likelihood of conflict with a short poll interval
+ string timestr = "archive";
+ time_t now2 = time(NULL);
+ struct tm *now = gmtime(& now2);
+ if (now != NULL)
+ {
+ char timestr2[100];
+ int rc = strftime(timestr2, sizeof(timestr2), "-%Y%m%d.%H%M%S", now);
+ if (rc > 0)
+ timestr += timestr2; // no sh_quote required
+ }
+
+ // last argument
+ pmlogger_options += " " + sh_quote(host_log_dir + (char)__pmPathSeparator() + timestr);
+
+ return pmlogger_options;
+}
+
+
+std::string
+pmmgr_pmie_daemon::daemon_command_line()
+{
+ string default_log_dir =
+ string(pmGetConfig("PCP_LOG_DIR")) + (char)__pmPathSeparator() + "pmmgr";
+ string log_dir = get_config_single ("log-directory");
+ if (log_dir == "") log_dir = default_log_dir;
+ else if(log_dir[0] != '/') log_dir = config_directory + (char)__pmPathSeparator() + log_dir;
+
+ (void) mkdir2 (log_dir.c_str(), 0777); // implicitly consults umask(2)
+
+ string host_log_dir = log_dir + (char)__pmPathSeparator() + hostid;
+ (void) mkdir2 (host_log_dir.c_str(), 0777);
+ // (errors creating actual files under host_log_dir will be noted shortly)
+
+ string pmie_command =
+ string(pmGetConfig("PCP_BIN_DIR")) + (char)__pmPathSeparator() + "pmie";
+ string pmie_options = sh_quote (pmie_command);
+
+ pmie_options += " " + get_config_single ("pmie") + " ";
+
+ // run pmieconf if requested
+ if (get_config_exists ("pmieconf"))
+ {
+ string pmieconf_output_file = host_log_dir + (char)__pmPathSeparator() + "config.pmie";
+ string pmieconf_command =
+ string(pmGetConfig("PCP_BIN_DIR")) + (char)__pmPathSeparator() + "pmieconf";
+
+ // NB: pmieconf doesn't take a host name as an argument, unlike pmlogconf
+ string pmieconf_options =
+ sh_quote(pmieconf_command)
+ + " -F -c " + get_config_single ("pmieconf")
+ + " -f " + sh_quote(pmieconf_output_file);
+
+ int rc = wrap_system(pmieconf_options.c_str());
+ if (rc) return "";
+
+ pmie_options += "-c " + sh_quote(pmieconf_output_file);
+ }
+
+ if (quit) return "";
+
+ // collect -h direction
+ pmie_options += " -h " + sh_quote(spec);
+
+ // collect -f, to get it to run in the foreground, avoid setuid
+ pmie_options += " -f";
+
+ // collect subsidiary pmlogger diagnostics
+ pmie_options += " -l " + sh_quote(host_log_dir + (char)__pmPathSeparator() + "pmie.log");
+
+ return pmie_options;
+}
+
+
+
+// ------------------------------------------------------------------------
+
+
+extern "C"
+void handle_interrupt (int sig)
+{
+ // Propagate signal to inferior processes (just once, to prevent
+ // recursive signals or whatnot, despite sa_mask in
+ // setup_signals()).
+ if (quit == 0)
+ kill(-getpid(), SIGTERM);
+
+ quit ++;
+ if (quit > 3) // ignore 1 from user; 1 from kill(-getpid) above; 1 from same near main() exit
+ {
+ char msg[] = "Too many interrupts received, exiting.\n";
+ int rc = write (2, msg, sizeof(msg)-1);
+ if (rc) {/* Do nothing; we don't care if our last gasp went out. */ ;}
+ // XXX: send a suicide signal to the process group?
+ _exit (1);
+ }
+}
+
+extern "C"
+void ignore_signal (int sig)
+{
+ (void) sig;
+}
+
+
+
+void setup_signals()
+{
+ // NB: we eschew __pmSetSignalHandler, since it uses signal(3),
+ // whose behavior is less predictable than sigaction(2).
+
+ struct sigaction sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.sa_handler = handle_interrupt;
+ sigemptyset (&sa.sa_mask);
+ sigaddset (&sa.sa_mask, SIGHUP);
+ sigaddset (&sa.sa_mask, SIGPIPE);
+ sigaddset (&sa.sa_mask, SIGINT);
+ sigaddset (&sa.sa_mask, SIGTERM);
+ sigaddset (&sa.sa_mask, SIGXFSZ);
+ sigaddset (&sa.sa_mask, SIGXCPU);
+ sa.sa_flags = SA_RESTART;
+ sigaction (SIGHUP, &sa, NULL);
+ sigaction (SIGPIPE, &sa, NULL);
+ sigaction (SIGINT, &sa, NULL);
+ sigaction (SIGTERM, &sa, NULL);
+ sigaction (SIGXFSZ, &sa, NULL);
+ sigaction (SIGXCPU, &sa, NULL);
+}
+
+
+
+// ------------------------------------------------------------------------
+
+static pmOptions opts;
+static pmLongOptions longopts[] =
+ {
+ PMAPI_OPTIONS_HEADER("Options"),
+ PMOPT_DEBUG,
+ { "config", 1, 'c', "DIR", "configuration directory [default $PCP_SYSCONF_DIR/pmmgr]" },
+ { "poll", 1, 'p', "NUM", "set pmcd polling interval [default 60]" },
+ { "username", 1, 'U', "USER", "switch to named user account [default pcp]" },
+ { "log", 1, 'l', "PATH", "redirect diagnostics and trace output" },
+ { "verbose", 0, 'v', 0, "verbose diagnostics to stderr" },
+ PMOPT_HELP,
+ PMAPI_OPTIONS_END
+ };
+
+int main (int argc, char *argv[])
+{
+ /* Become our own process group, to assist signal passing to children. */
+ setpgid(getpid(), 0);
+ setup_signals();
+
+ string default_config_dir =
+ string(pmGetConfig("PCP_SYSCONF_DIR")) + (char)__pmPathSeparator() + "pmmgr";
+ vector<pmmgr_job_spec*> js;
+
+ int c;
+ char* username_str;
+ __pmGetUsername(& username_str);
+ string username = username_str;
+ char* output_filename = NULL;
+
+ opts.long_options = longopts;
+ opts.short_options = "D:c:vp:U:l:?";
+
+ while ((c = pmgetopt_r(argc, argv, &opts)) != EOF)
+ {
+ switch (c)
+ {
+ case 'D': // undocumented
+ if ((c = __pmParseDebug(opts.optarg)) < 0)
+ {
+ pmprintf("%s: unrecognized debug flag specification (%s)\n",
+ pmProgname, opts.optarg);
+ opts.errors++;
+ }
+ else
+ {
+ pmDebug |= c;
+ }
+ break;
+
+ case 'l':
+ output_filename = opts.optarg;
+ break;
+
+ case 'v':
+ pmDebug |= DBG_TRACE_APPL0;
+ break;
+
+ case 'p':
+ polltime = atoi(opts.optarg);
+ if (polltime <= 0)
+ {
+ pmprintf("%s: poll time too short\n", pmProgname);
+ opts.errors++;
+ }
+ break;
+
+ case 'c':
+ js.push_back (new pmmgr_job_spec(opts.optarg));
+ break;
+
+ case 'U':
+ username = opts.optarg;
+ break;
+
+ default:
+ opts.errors++;
+ }
+ }
+
+ if (opts.errors)
+ {
+ pmUsageMessage(&opts);
+ exit(1);
+ }
+
+ // default
+ if (js.size() == 0)
+ js.push_back (new pmmgr_job_spec(default_config_dir));
+
+ // let pmdapmcd know pmmgr is currently running
+ if (__pmServerCreatePIDFile(pmProgname, PM_FATAL_ERR) < 0)
+ exit(1);
+
+ // lose root privileges if we have them
+ __pmSetProcessIdentity(username.c_str());
+
+ // (re)create log file, redirect stdout/stderr
+ // NB: must be done after __pmSetProcessIdentity() for proper file permissions
+ if (output_filename)
+ {
+ int fd;
+ (void) unlink (output_filename); // in case one's left over from a previous other-uid run
+ fd = open (output_filename, O_WRONLY|O_APPEND|O_CREAT|O_TRUNC, 0666);
+ if (fd < 0)
+ timestamp(cerr) << "Cannot re-create logfile " << output_filename << endl;
+ else
+ {
+ int rc;
+ // Move the new file descriptors on top of stdout/stderr
+ rc = dup2 (fd, STDOUT_FILENO);
+ if (rc < 0) // rather unlikely
+ timestamp(cerr) << "Cannot redirect logfile to stdout" << endl;
+ rc = dup2 (fd, STDERR_FILENO);
+ if (rc < 0) // rather unlikely
+ timestamp(cerr) << "Cannot redirect logfile to stderr" << endl;
+ rc = close (fd);
+ if (rc < 0) // rather unlikely
+ timestamp(cerr) << "Cannot close logfile fd" << endl;
+ }
+
+ }
+
+ timestamp(cout) << "Log started" << endl;
+ while (! quit)
+ {
+ // In this section, we must not fidget with SIGCHLD, due to use of system(3).
+ for (unsigned i=0; i<js.size() && !quit; i++)
+ js[i]->poll();
+
+ if (quit)
+ break;
+
+ // We want to respond quickly if a child daemon process dies.
+ (void) signal (SIGCHLD, ignore_signal);
+ (void) signal (SIGALRM, ignore_signal);
+ alarm (polltime);
+ pause ();
+ alarm (0);
+ (void) signal (SIGCHLD, SIG_DFL);
+ (void) signal (SIGALRM, SIG_DFL);
+ }
+
+ // NB: don't let this cleanup be interrupted by pending-quit signals;
+ // we want the daemon pid's killed.
+ for (unsigned i=0; i<js.size(); i++)
+ delete js[i];
+
+ timestamp(cout) << "Log finished" << endl;
+
+ // Send a last-gasp signal out, just in case daemons somehow missed
+ kill(-getpid(), SIGTERM);
+
+ return 0;
+}