# # Copyright (C) 2012-2014 Red Hat Inc. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # import sys import unittest from pcp import pmapi import cpmapi as api # Utilities def dump_seq(name_p, seq_p): print(name_p) for text in seq_p: if type(text) == type(int()) or type(text) == type(long()): print(hex(text)) else: print(text) print('') def dump_array_ptrs(name_p, arr_p): print(name_p) for i in xrange(len(arr_p)): if (i > 0): print(" ", arr_p[i].contents) else: print(arr_p[i].contents) def dump_array(name_p, arr_p): print(name_p) for i in xrange(len(arr_p)): if (i > 0): print(" ", hex(arr_p[i])) else: print(hex(arr_p[i])) ARCHIVE = "" # For testing either localhost or archive TRAVERSE_CALLBACK_COUNT = 0 # callback for pmTraversePMNS def traverse_callback(arg): global TRAVERSE_CALLBACK_COUNT TRAVERSE_CALLBACK_COUNT += 1 def test_pcp(self): if (ARCHIVE == ""): print('Running as localhost') ctx = pmapi.pmContext(api.PM_CONTEXT_HOST, "localhost") self.local_type = True else: print('Running as archive', ARCHIVE) ctx = pmapi.pmContext(api.PM_CONTEXT_ARCHIVE, ARCHIVE) self.archive_type = True # pmGetContextHostName hostname = ctx.pmGetContextHostName() print("pmGetContextHostName:", hostname) self.assertTrue(len(hostname) >= 0) # pmParseMetricSpec source = 'localhost' try: (rsltp, errmsg) = ctx.pmParseMetricSpec("kernel.all.load", 0, source) print("pmParseMetricSpec:", rsltp.contents.source) self.assertTrue(rsltp.contents.source == source) except pmapi.pmErr as error: print("pmParseMetricSpec error: ", error) # Get number of cpus # pmLookupName try: self.ncpu_id = ctx.pmLookupName(("hinv.ncpu", "kernel.all.load", "hinv.machine")) print("pmLookupName:", self.ncpu_id) self.assertTrue(True) except pmapi.pmErr as error: self.assertTrue(False) # pmIDStr print("pmIDStr:", ctx.pmIDStr(self.ncpu_id[0])) self.assertTrue(ctx.pmIDStr(self.ncpu_id[0]).count(".") > 1) # pmLookupDesc try: descs = ctx.pmLookupDescs(self.ncpu_id) self.assertTrue(True) except pmapi.pmErr as error: self.assertTrue(False) # pmFetch try: results = ctx.pmFetch(self.ncpu_id) print("pmFetch:", results) self.assertTrue(True) except pmapi.pmErr as error: print("pmFetch: ", error) self.assertTrue(False) # pmExtractValue atom = ctx.pmExtractValue(results.contents.get_valfmt(0), results.contents.get_vlist(0, 0), descs[0].contents.type, api.PM_TYPE_U32) self.assertTrue(atom.ul > 0) print("pmExtractValue:", atom.ul) atom = ctx.pmExtractValue(results.contents.get_valfmt(0), results.contents.get_vlist(2, 0), descs[0].contents.type, api.PM_TYPE_U32) machine_u32 = atom.ll atom = ctx.pmExtractValue(results.contents.get_valfmt(2), results.contents.get_vlist(2, 0), descs[2].contents.type, api.PM_TYPE_STRING) machine_vp = atom.vp self.assertTrue(machine_u32 != machine_vp) self.assertTrue(type(atom.cp) == type('')) # pmGetChildren if not self.archive_type: gcs = ctx.pmGetChildren("kernel") print("pmGetChildren:", gcs) self.assertTrue(len(gcs) >=2) # pmGetChildrenStatus gcs = ctx.pmGetChildrenStatus("kernel") print("pmGetChildrenStatus:", gcs) self.assertTrue(len(gcs[0]) == len(gcs[1])) # pmGetPMNSLocation index = ctx.pmGetPMNSLocation() print("pmGetPMNSLocation:", index) self.assertTrue(index == api.PMNS_ARCHIVE or index == api.PMNS_LOCAL or index == api.PMNS_REMOTE) # pmTraversePMNS ctx.pmTraversePMNS("kernel", traverse_callback) print("pmTraversePMNS:", TRAVERSE_CALLBACK_COUNT) self.assertTrue(TRAVERSE_CALLBACK_COUNT > 0) # pmLookupName try: badid = ctx.pmLookupName("A_BAD_METRIC") self.assertTrue(False) except pmapi.pmErr as error: print("pmLookupName:", error) self.assertTrue(True) metrics = ("no.such.metric.name", "sample.bin") try: pmidlist = ctx.pmLookupName(metrics, relaxed = 1) print("pmLookupName relaxed mode", metrics) self.assertTrue(len(pmidlist) == 2) self.assertTrue(pmidlist[0] == api.PM_ID_NULL) self.assertTrue(pmidlist[1] != api.PM_ID_NULL) except pmapi.pmErr as error: print("pmLookupName relaxed mode:", error) self.assertTrue(False) metrics = ("kernel.all.load", "sample.bin", "kernel.percpu.cpu.sys", "mem.freemem") try: self.metric_ids = ctx.pmLookupName(metrics) self.assertTrue(True) except pmapi.pmErr as error: print("pmLookupName:", error) self.assertTrue(False) dump_array("pmLookupName", self.metric_ids) self.assertTrue(len(self.metric_ids) == 4) for i in xrange(len(metrics)): # pmNameAll nameall = ctx.pmNameAll(self.metric_ids[i]) print("pmNameAll:", nameall[0]) self.assertTrue(nameall[0] == metrics[i]) # pmNameID name = ctx.pmNameID(self.metric_ids[i]) print("pmNameID:", name) self.assertTrue(name == metrics[i]) # pmLookupDesc descs = ctx.pmLookupDescs(self.metric_ids[i]) dump_array_ptrs("pmLookupDesc", descs) self.assertTrue(len(descs) == 1) descs = ctx.pmLookupDescs(self.metric_ids) if self.local_type: # pmGetInDom - sample.bin (inst, name) = ctx.pmGetInDom(descs[1]) print("pmGetInDom:", name) self.assertTrue(len(inst) == 9 and len(name) == 9) else: # pmGetInDomArchive - sample.bin (inst, name) = ctx.pmGetInDomArchive(descs[1]) print("pmGetInDomArchive:", name) self.assertTrue(len(inst) == 9 and len(name) == 9) # pmInDomStr indomstr = ctx.pmInDomStr(descs[0]) print("pmInDomStr:", indomstr) self.assertTrue(indomstr.count(".") >= 1) # pmDelProfile try: ctx.pmDelProfile(descs[0], None) print("pmDelProfile: ok") self.assertTrue(True) except pmapi.pmErr as error: print("pmDelProfile: ", error) self.assertTrue(False) if self.local_type: # pmLookupInDom inst1 = ctx.pmLookupInDom(descs[0], "1 minute") print("pmLookupInDom:", inst1) else: # pmLookupInDomArchive inst1 = ctx.pmLookupInDomArchive(descs[0], "1 minute") print("pmLookupInDomArchive:", inst1) self.assertTrue(inst1 >= 0) if self.local_type: # pmNameInDom instname = ctx.pmNameInDom(descs[0], inst1) print("pmNameInDom:", instname) else: # pmNameInDomArchive instname = ctx.pmNameInDomArchive(descs[0], inst1) print("pmNameInDomArchive:", instname) self.assertTrue(instname == "1 minute") try: # pmLookupInDomText ctx.pmLookupInDomText(descs[0]) self.assertTrue(False) except pmapi.pmErr as error: print("pmLookupInDomText:", error) self.assertTrue(True) # pmAddProfile try: ctx.pmAddProfile(descs[0], inst1) self.assertTrue(True) except pmapi.pmErr as error: print("pmAddProfile: ", error) self.assertTrue(False) inst = 0 try: # pmLookupInDom inst = ctx.pmLookupInDom(descs[0], "gg minute") self.assertTrue(False) except pmapi.pmErr as error: print("pmLookupInDom:", error) self.assertTrue(True) if self.local_type: # pmLookupInDom inst5 = ctx.pmLookupInDom(descs[0], "5 minute") inst15 = ctx.pmLookupInDom(descs[0], "15 minute") print("pmLookupInDom:", inst5, inst15) else: # pmLookupInDomArchive inst5 = ctx.pmLookupInDomArchive(descs[0], "5 minute") inst15 = ctx.pmLookupInDomArchive(descs[0], "15 minute") print("pmLookupInDomArchive:", inst5, inst15) self.assertTrue(inst15 >= 0) # pmAddProfile try: ctx.pmAddProfile(descs[0], inst15) print("pmAddProfile:") self.assertTrue(True) except pmapi.pmErr as error: print("pmAddProfile: ", error) self.assertTrue(False) # pmParseInterval try: (delta, errmsg) = ctx.pmParseInterval("3 seconds") print("pmParseInterval:", delta) self.assertTrue(True) except pmapi.pmErr as error: print("pmFetch: ", error) self.assertTrue(False) # pmFetch try: results = ctx.pmFetch(self.metric_ids) print("pmFetch:", results) self.assertTrue(True) except pmapi.pmErr as error: print("pmFetch: ", error) self.assertTrue(False) # pmSortInstances try: ctx.pmSortInstances(results) print("pmSortInstances: ok") except pmapi.pmErr as error: print("pmSortInstances: ", error) self.assertTrue(False) # pmStore try: ctx.pmStore(results) self.assertTrue(False) except pmapi.pmErr as error: print("pmStore: ", error) self.assertTrue(True) for i in xrange(results.contents.numpmid): if (results.contents.get_pmid(i) != self.metric_ids[1]): continue for val in xrange(9): # sample.bin - each instance in turn atom = ctx.pmExtractValue(results.contents.get_valfmt(i), results.contents.get_vlist(i, val), descs[i].contents.type, api.PM_TYPE_FLOAT) print("pmExtractValue", val, atom.f) self.assertTrue(99*(val+1) <= atom.f and atom.f <= 101*(val+1)) # pmExtractValue for i in xrange(results.contents.numpmid): if (results.contents.get_pmid(i) != self.metric_ids[3]): continue # mem.freemem tmpatom = ctx.pmExtractValue(results.contents.get_valfmt(i), results.contents.get_vlist(i, 0), descs[i].contents.type, api.PM_TYPE_FLOAT) self.assertTrue(tmpatom.f > 0) # pmConvScale atom = ctx.pmConvScale(api.PM_TYPE_FLOAT, tmpatom, descs, 3, api.PM_SPACE_MBYTE) print("pmConvScale, integer arg", tmpatom.f, atom.f) self.assertTrue(atom.f > 0) atom = ctx.pmConvScale(api.PM_TYPE_FLOAT, tmpatom, descs, 3, pmapi.pmUnits(1,0,0,api.PM_SPACE_MBYTE,0,0)) print("pmConvScale, pmUnits arg", tmpatom.f, atom.f) self.assertTrue(atom.f > 0) # pmAtomStr atomstr = ctx.pmAtomStr(atom, api.PM_TYPE_FLOAT) print("pmAtomStr", atomstr) # pmtimevalSleep ctx.pmtimevalSleep(delta) print("pmtimevalSleep") # pmDupContext context = ctx.pmDupContext() print("pmDupContext", context) self.assertTrue(context >= 0) # pmWhichContext try: ctx.pmWhichContext() print("pmWhichContext: ok") self.assertTrue(True) except pmapi.pmErr as error: print("pmWhichContext: ", error) self.assertTrue(False) # pmTypeStr typestr = ctx.pmTypeStr(api.PM_TYPE_FLOAT) print("pmTypeStr", typestr) self.assertTrue(typestr == "FLOAT") if self.archive_type: # pmSetMode try: ctx.pmSetMode(api.PM_MODE_INTERP, results.contents.timestamp, 0) print("pmSetMode: ok") self.assertTrue(True) except pmapi.pmErr as error: print("pmSetMode: ", error) self.assertTrue(False) # pmGetArchiveLabel try: loglabel = ctx.pmGetArchiveLabel() print("pmGetArchiveLabel: ok") self.assertTrue(True) except pmapi.pmErr as error: print("pmGetArchiveLabel: ", error) self.assertTrue(False) # pmGetArchiveEnd try: tval = ctx.pmGetArchiveEnd() print("pmGetArchiveEnd: ", tval) self.assertTrue(True) except pmapi.pmErr as error: print("pmGetArchiveEnd: ", error) self.assertTrue(False) # pmPrintValue if not self.archive_type: print("pmPrintValue:") ctx.pmPrintValue(sys.__stdout__, results, descs[0], 0, 0, 8) print('') # pmReconnectContext try: ctx.pmReconnectContext() print("pmReconnectContext: ok") self.assertTrue(True) except pmapi.pmErr as error: print("pmReconnectContext: ", error) self.assertTrue(False) # pmRegisterDerived try: ctx.pmRegisterDerived("pcpqa.derivation", "sample.double.bin * delta(sample.double.bin_ctr)") average = ctx.pmLookupName("pcpqa.derivation") descs = ctx.pmLookupDescs(average) results = ctx.pmFetch(average) ctx.pmtimevalSleep(delta) results = ctx.pmFetch(average) atom = ctx.pmExtractValue(results.contents.get_valfmt(0), results.contents.get_vlist(0, 0), descs[0].contents.type, api.PM_TYPE_DOUBLE) self.assertTrue(True) except pmapi.pmErr as error: self.assertTrue(False) try: ctx.pmRegisterDerived("a.bad.expression", "a $ bad @ expression") self.assertTrue(False) except pmapi.pmErr as error: print("pmRegisterDerived: ", ctx.pmDerivedErrStr()) self.assertTrue(True) # pmFreeResult ctx.pmFreeResult(results) print("pmFreeResult") del ctx class TestSequenceFunctions(unittest.TestCase): ncpu_id = [] metric_ids = [] archive_type = False local_type = False def test_context(self): test_pcp(self) if __name__ == '__main__': if (len(sys.argv) == 2): ARCHIVE = sys.argv[1] elif (len(sys.argv) > 2): print("Usage: " + sys.argv[0] + " OptionalArchivePath") sys.exit() else: ARCHIVE = "" sys.argv[1:] = () STS = unittest.main() sys.exit(STS)