summaryrefslogtreecommitdiff
path: root/qa/src/test_pcp.python
diff options
context:
space:
mode:
Diffstat (limited to 'qa/src/test_pcp.python')
-rwxr-xr-xqa/src/test_pcp.python481
1 files changed, 481 insertions, 0 deletions
diff --git a/qa/src/test_pcp.python b/qa/src/test_pcp.python
new file mode 100755
index 0000000..e165705
--- /dev/null
+++ b/qa/src/test_pcp.python
@@ -0,0 +1,481 @@
+#
+# Copyright (C) 2012-2014 Red Hat Inc.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+
+import sys
+import unittest
+from pcp import pmapi
+import cpmapi as api
+
+# Utilities
+
+def dump_seq(name_p, seq_p):
+ print(name_p)
+ for text in seq_p:
+ if type(text) == type(int()) or type(text) == type(long()):
+ print(hex(text))
+ else:
+ print(text)
+ print('')
+
+def dump_array_ptrs(name_p, arr_p):
+ print(name_p)
+ for i in xrange(len(arr_p)):
+ if (i > 0):
+ print(" ", arr_p[i].contents)
+ else:
+ print(arr_p[i].contents)
+
+def dump_array(name_p, arr_p):
+ print(name_p)
+ for i in xrange(len(arr_p)):
+ if (i > 0):
+ print(" ", hex(arr_p[i]))
+ else:
+ print(hex(arr_p[i]))
+
+ARCHIVE = "" # For testing either localhost or archive
+
+TRAVERSE_CALLBACK_COUNT = 0 # callback for pmTraversePMNS
+
+def traverse_callback(arg):
+ global TRAVERSE_CALLBACK_COUNT
+ TRAVERSE_CALLBACK_COUNT += 1
+
+def test_pcp(self):
+
+ if (ARCHIVE == ""):
+ print('Running as localhost')
+ ctx = pmapi.pmContext(api.PM_CONTEXT_HOST, "localhost")
+ self.local_type = True
+ else:
+ print('Running as archive', ARCHIVE)
+ ctx = pmapi.pmContext(api.PM_CONTEXT_ARCHIVE, ARCHIVE)
+ self.archive_type = True
+
+ # pmGetContextHostName
+ hostname = ctx.pmGetContextHostName()
+ print("pmGetContextHostName:", hostname)
+ self.assertTrue(len(hostname) >= 0)
+
+ # pmParseMetricSpec
+ source = 'localhost'
+ try:
+ (rsltp, errmsg) = ctx.pmParseMetricSpec("kernel.all.load", 0, source)
+ print("pmParseMetricSpec:", rsltp.contents.source)
+ self.assertTrue(rsltp.contents.source == source)
+ except pmapi.pmErr as error:
+ print("pmParseMetricSpec error: ", error)
+
+ # Get number of cpus
+ # pmLookupName
+ try:
+ self.ncpu_id = ctx.pmLookupName(("hinv.ncpu", "kernel.all.load", "hinv.machine"))
+ print("pmLookupName:", self.ncpu_id)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ self.assertTrue(False)
+
+ # pmIDStr
+ print("pmIDStr:", ctx.pmIDStr(self.ncpu_id[0]))
+ self.assertTrue(ctx.pmIDStr(self.ncpu_id[0]).count(".") > 1)
+
+ # pmLookupDesc
+ try:
+ descs = ctx.pmLookupDescs(self.ncpu_id)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ self.assertTrue(False)
+
+ # pmFetch
+ try:
+ results = ctx.pmFetch(self.ncpu_id)
+ print("pmFetch:", results)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmFetch: ", error)
+ self.assertTrue(False)
+
+ # pmExtractValue
+ atom = ctx.pmExtractValue(results.contents.get_valfmt(0),
+ results.contents.get_vlist(0, 0),
+ descs[0].contents.type,
+ api.PM_TYPE_U32)
+ self.assertTrue(atom.ul > 0)
+ print("pmExtractValue:", atom.ul)
+ atom = ctx.pmExtractValue(results.contents.get_valfmt(0),
+ results.contents.get_vlist(2, 0),
+ descs[0].contents.type,
+ api.PM_TYPE_U32)
+ machine_u32 = atom.ll
+ atom = ctx.pmExtractValue(results.contents.get_valfmt(2),
+ results.contents.get_vlist(2, 0),
+ descs[2].contents.type,
+ api.PM_TYPE_STRING)
+ machine_vp = atom.vp
+ self.assertTrue(machine_u32 != machine_vp)
+ self.assertTrue(type(atom.cp) == type(''))
+
+ # pmGetChildren
+ if not self.archive_type:
+ gcs = ctx.pmGetChildren("kernel")
+ print("pmGetChildren:", gcs)
+ self.assertTrue(len(gcs) >=2)
+
+ # pmGetChildrenStatus
+ gcs = ctx.pmGetChildrenStatus("kernel")
+ print("pmGetChildrenStatus:", gcs)
+ self.assertTrue(len(gcs[0]) == len(gcs[1]))
+
+ # pmGetPMNSLocation
+ index = ctx.pmGetPMNSLocation()
+ print("pmGetPMNSLocation:", index)
+ self.assertTrue(index == api.PMNS_ARCHIVE or
+ index == api.PMNS_LOCAL or
+ index == api.PMNS_REMOTE)
+
+ # pmTraversePMNS
+ ctx.pmTraversePMNS("kernel", traverse_callback)
+ print("pmTraversePMNS:", TRAVERSE_CALLBACK_COUNT)
+ self.assertTrue(TRAVERSE_CALLBACK_COUNT > 0)
+
+ # pmLookupName
+ try:
+ badid = ctx.pmLookupName("A_BAD_METRIC")
+ self.assertTrue(False)
+ except pmapi.pmErr as error:
+ print("pmLookupName:", error)
+ self.assertTrue(True)
+
+ metrics = ("no.such.metric.name", "sample.bin")
+ try:
+ pmidlist = ctx.pmLookupName(metrics, relaxed = 1)
+ print("pmLookupName relaxed mode", metrics)
+ self.assertTrue(len(pmidlist) == 2)
+ self.assertTrue(pmidlist[0] == api.PM_ID_NULL)
+ self.assertTrue(pmidlist[1] != api.PM_ID_NULL)
+ except pmapi.pmErr as error:
+ print("pmLookupName relaxed mode:", error)
+ self.assertTrue(False)
+
+ metrics = ("kernel.all.load", "sample.bin",
+ "kernel.percpu.cpu.sys", "mem.freemem")
+ try:
+ self.metric_ids = ctx.pmLookupName(metrics)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmLookupName:", error)
+ self.assertTrue(False)
+ dump_array("pmLookupName", self.metric_ids)
+ self.assertTrue(len(self.metric_ids) == 4)
+
+ for i in xrange(len(metrics)):
+ # pmNameAll
+ nameall = ctx.pmNameAll(self.metric_ids[i])
+ print("pmNameAll:", nameall[0])
+ self.assertTrue(nameall[0] == metrics[i])
+
+ # pmNameID
+ name = ctx.pmNameID(self.metric_ids[i])
+ print("pmNameID:", name)
+ self.assertTrue(name == metrics[i])
+
+ # pmLookupDesc
+ descs = ctx.pmLookupDescs(self.metric_ids[i])
+ dump_array_ptrs("pmLookupDesc", descs)
+ self.assertTrue(len(descs) == 1)
+
+ descs = ctx.pmLookupDescs(self.metric_ids)
+ if self.local_type:
+ # pmGetInDom - sample.bin
+ (inst, name) = ctx.pmGetInDom(descs[1])
+ print("pmGetInDom:", name)
+ self.assertTrue(len(inst) == 9 and len(name) == 9)
+ else:
+ # pmGetInDomArchive - sample.bin
+ (inst, name) = ctx.pmGetInDomArchive(descs[1])
+ print("pmGetInDomArchive:", name)
+ self.assertTrue(len(inst) == 9 and len(name) == 9)
+
+ # pmInDomStr
+ indomstr = ctx.pmInDomStr(descs[0])
+ print("pmInDomStr:", indomstr)
+ self.assertTrue(indomstr.count(".") >= 1)
+
+ # pmDelProfile
+ try:
+ ctx.pmDelProfile(descs[0], None)
+ print("pmDelProfile: ok")
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmDelProfile: ", error)
+ self.assertTrue(False)
+
+ if self.local_type:
+ # pmLookupInDom
+ inst1 = ctx.pmLookupInDom(descs[0], "1 minute")
+ print("pmLookupInDom:", inst1)
+ else:
+ # pmLookupInDomArchive
+ inst1 = ctx.pmLookupInDomArchive(descs[0], "1 minute")
+ print("pmLookupInDomArchive:", inst1)
+ self.assertTrue(inst1 >= 0)
+
+ if self.local_type:
+ # pmNameInDom
+ instname = ctx.pmNameInDom(descs[0], inst1)
+ print("pmNameInDom:", instname)
+ else:
+ # pmNameInDomArchive
+ instname = ctx.pmNameInDomArchive(descs[0], inst1)
+ print("pmNameInDomArchive:", instname)
+ self.assertTrue(instname == "1 minute")
+
+ try:
+ # pmLookupInDomText
+ ctx.pmLookupInDomText(descs[0])
+ self.assertTrue(False)
+ except pmapi.pmErr as error:
+ print("pmLookupInDomText:", error)
+ self.assertTrue(True)
+
+ # pmAddProfile
+ try:
+ ctx.pmAddProfile(descs[0], inst1)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmAddProfile: ", error)
+ self.assertTrue(False)
+
+ inst = 0
+ try:
+ # pmLookupInDom
+ inst = ctx.pmLookupInDom(descs[0], "gg minute")
+ self.assertTrue(False)
+ except pmapi.pmErr as error:
+ print("pmLookupInDom:", error)
+ self.assertTrue(True)
+
+ if self.local_type:
+ # pmLookupInDom
+ inst5 = ctx.pmLookupInDom(descs[0], "5 minute")
+ inst15 = ctx.pmLookupInDom(descs[0], "15 minute")
+ print("pmLookupInDom:", inst5, inst15)
+ else:
+ # pmLookupInDomArchive
+ inst5 = ctx.pmLookupInDomArchive(descs[0], "5 minute")
+ inst15 = ctx.pmLookupInDomArchive(descs[0], "15 minute")
+ print("pmLookupInDomArchive:", inst5, inst15)
+ self.assertTrue(inst15 >= 0)
+
+ # pmAddProfile
+ try:
+ ctx.pmAddProfile(descs[0], inst15)
+ print("pmAddProfile:")
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmAddProfile: ", error)
+ self.assertTrue(False)
+
+ # pmParseInterval
+ try:
+ (delta, errmsg) = ctx.pmParseInterval("3 seconds")
+ print("pmParseInterval:", delta)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmFetch: ", error)
+ self.assertTrue(False)
+
+ # pmFetch
+ try:
+ results = ctx.pmFetch(self.metric_ids)
+ print("pmFetch:", results)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmFetch: ", error)
+ self.assertTrue(False)
+
+ # pmSortInstances
+ try:
+ ctx.pmSortInstances(results)
+ print("pmSortInstances: ok")
+ except pmapi.pmErr as error:
+ print("pmSortInstances: ", error)
+ self.assertTrue(False)
+
+ # pmStore
+ try:
+ ctx.pmStore(results)
+ self.assertTrue(False)
+ except pmapi.pmErr as error:
+ print("pmStore: ", error)
+ self.assertTrue(True)
+
+ for i in xrange(results.contents.numpmid):
+ if (results.contents.get_pmid(i) != self.metric_ids[1]):
+ continue
+ for val in xrange(9):
+ # sample.bin - each instance in turn
+ atom = ctx.pmExtractValue(results.contents.get_valfmt(i),
+ results.contents.get_vlist(i, val),
+ descs[i].contents.type,
+ api.PM_TYPE_FLOAT)
+ print("pmExtractValue", val, atom.f)
+ self.assertTrue(99*(val+1) <= atom.f and atom.f <= 101*(val+1))
+
+ # pmExtractValue
+ for i in xrange(results.contents.numpmid):
+ if (results.contents.get_pmid(i) != self.metric_ids[3]):
+ continue
+ # mem.freemem
+ tmpatom = ctx.pmExtractValue(results.contents.get_valfmt(i),
+ results.contents.get_vlist(i, 0),
+ descs[i].contents.type,
+ api.PM_TYPE_FLOAT)
+ self.assertTrue(tmpatom.f > 0)
+
+ # pmConvScale
+ atom = ctx.pmConvScale(api.PM_TYPE_FLOAT, tmpatom, descs, 3,
+ api.PM_SPACE_MBYTE)
+ print("pmConvScale, integer arg", tmpatom.f, atom.f)
+ self.assertTrue(atom.f > 0)
+ atom = ctx.pmConvScale(api.PM_TYPE_FLOAT, tmpatom, descs, 3,
+ pmapi.pmUnits(1,0,0,api.PM_SPACE_MBYTE,0,0))
+ print("pmConvScale, pmUnits arg", tmpatom.f, atom.f)
+ self.assertTrue(atom.f > 0)
+
+ # pmAtomStr
+ atomstr = ctx.pmAtomStr(atom, api.PM_TYPE_FLOAT)
+ print("pmAtomStr", atomstr)
+
+ # pmtimevalSleep
+ ctx.pmtimevalSleep(delta)
+ print("pmtimevalSleep")
+
+ # pmDupContext
+ context = ctx.pmDupContext()
+ print("pmDupContext", context)
+ self.assertTrue(context >= 0)
+
+ # pmWhichContext
+ try:
+ ctx.pmWhichContext()
+ print("pmWhichContext: ok")
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmWhichContext: ", error)
+ self.assertTrue(False)
+
+ # pmTypeStr
+ typestr = ctx.pmTypeStr(api.PM_TYPE_FLOAT)
+ print("pmTypeStr", typestr)
+ self.assertTrue(typestr == "FLOAT")
+
+ if self.archive_type:
+ # pmSetMode
+ try:
+ ctx.pmSetMode(api.PM_MODE_INTERP, results.contents.timestamp, 0)
+ print("pmSetMode: ok")
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmSetMode: ", error)
+ self.assertTrue(False)
+
+
+ # pmGetArchiveLabel
+ try:
+ loglabel = ctx.pmGetArchiveLabel()
+ print("pmGetArchiveLabel: ok")
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmGetArchiveLabel: ", error)
+ self.assertTrue(False)
+
+ # pmGetArchiveEnd
+ try:
+ tval = ctx.pmGetArchiveEnd()
+ print("pmGetArchiveEnd: ", tval)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmGetArchiveEnd: ", error)
+ self.assertTrue(False)
+
+ # pmPrintValue
+ if not self.archive_type:
+ print("pmPrintValue:")
+ ctx.pmPrintValue(sys.__stdout__, results, descs[0], 0, 0, 8)
+ print('')
+
+ # pmReconnectContext
+ try:
+ ctx.pmReconnectContext()
+ print("pmReconnectContext: ok")
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ print("pmReconnectContext: ", error)
+ self.assertTrue(False)
+
+ # pmRegisterDerived
+ try:
+ ctx.pmRegisterDerived("pcpqa.derivation",
+ "sample.double.bin * delta(sample.double.bin_ctr)")
+ average = ctx.pmLookupName("pcpqa.derivation")
+ descs = ctx.pmLookupDescs(average)
+ results = ctx.pmFetch(average)
+ ctx.pmtimevalSleep(delta)
+ results = ctx.pmFetch(average)
+ atom = ctx.pmExtractValue(results.contents.get_valfmt(0),
+ results.contents.get_vlist(0, 0),
+ descs[0].contents.type,
+ api.PM_TYPE_DOUBLE)
+ self.assertTrue(True)
+ except pmapi.pmErr as error:
+ self.assertTrue(False)
+ try:
+ ctx.pmRegisterDerived("a.bad.expression", "a $ bad @ expression")
+ self.assertTrue(False)
+ except pmapi.pmErr as error:
+ print("pmRegisterDerived: ", ctx.pmDerivedErrStr())
+ self.assertTrue(True)
+
+ # pmFreeResult
+ ctx.pmFreeResult(results)
+ print("pmFreeResult")
+
+ del ctx
+
+class TestSequenceFunctions(unittest.TestCase):
+
+ ncpu_id = []
+ metric_ids = []
+ archive_type = False
+ local_type = False
+
+ def test_context(self):
+ test_pcp(self)
+
+
+if __name__ == '__main__':
+
+ if (len(sys.argv) == 2):
+ ARCHIVE = sys.argv[1]
+ elif (len(sys.argv) > 2):
+ print("Usage: " + sys.argv[0] + " OptionalArchivePath")
+ sys.exit()
+ else:
+ ARCHIVE = ""
+
+ sys.argv[1:] = ()
+
+ STS = unittest.main()
+ sys.exit(STS)