''' Performance Metrics Domain Agent exporting Gluster filesystem metrics. ''' # # Copyright (c) 2013-2014 Red Hat. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # import cpmapi as c_api from pcp.pmapi import pmUnits from pcp.pmda import PMDA, pmdaMetric, pmdaIndom from ctypes import c_int, c_long, c_float, POINTER, cast, Structure import xml.etree.cElementTree as xmltree from os import getenv import subprocess VOL_INFO_COMMAND = 'gluster --xml volume info' VOL_STOP_COMMAND = 'gluster --xml volume profile %s stop' VOL_START_COMMAND = 'gluster --xml volume profile %s start' VOL_STATS_COMMAND = 'gluster --xml volume profile %s info' FILEOPS = [ # append only, do not change the order (changes PMID) 'ACCESS', 'CREATE', 'DISCARD', 'ENTRYLK', 'FALLOCATE', 'FENTRYLK', 'FGETXATTR', 'FINODELK', 'FLUSH', 'FREMOVEXATTR', 'FSETATTR', 'FSETXATTR', 'FSTAT', 'FSYNC', 'FSYNCDIR', 'FTRUNCATE', 'FXATTROP', 'GETSPEC', 'GETXATTR', 'INODELK', 'LINK', 'LK', 'LOOKUP', 'MKDIR', 'MKNOD', 'OPEN', 'OPENDIR', 'RCHECKSUM', 'READDIR', 'READDIRP', 'READLINK', 'READV', 'REMOVEXATTR', 'RENAME', 'RMDIR', 'SETATTR', 'SETXATTR', 'STAT', 'STATFS', 'SYMLINK', 'TRUNCATE', 'UNLINK', 'WRITEV', 'XATTROP', 'READ', 'WRITE', 'RELEASE', 'RELEASEDIR', 'FORGET' ] FILEOPS_INDICES = {} class GlusterVolume(Structure): ''' Statistic values per-gluster-volume (volume indom cache lookup) ''' _fields_ = [("distCount", c_int), ("stripeCount", c_int), ("replicaCount", c_int), ("fopHitsEnabled", c_int), ("latencyEnabled", c_int)] class GlusterBrick(Structure): ''' Statistic values per-gluster-brick, within a volume (brick indom) ''' _fields_ = [("mintime", c_long * len(FILEOPS)), ("maxtime", c_long * len(FILEOPS)), ("avgtime", c_float * len(FILEOPS)), ("count", c_long * len(FILEOPS)), ("read_bytes", c_long), ("write_bytes", c_long)] class GlusterPMDA(PMDA): ''' Performance Metrics Domain Agent exporting gluster brick metrics. Install it and make basic use of it if you use glusterfs, as follows: # $PCP_PMDAS_DIR/gluster/Install $ pminfo -fmdtT gluster ''' # volumes and bricks instance domains volumes = {} bricks = {} def runVolumeInfo(self): ''' Execute the gluster volume info command to extract volume names ''' process = subprocess.Popen(self.vol_info, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) out, err = process.communicate() # self.log('Info command %s output: %s' % (self.vol_info, out)) return xmltree.fromstring(out) def runVolumeProfileInfo(self, volume): ''' Execute gluster volume profile info command for a given volume ''' command = self.vol_stats % volume process = subprocess.Popen(command, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) out, err = process.communicate() # self.log('Profile command %s output: %s' % (command, out)) return xmltree.fromstring(out) def parseVolumeInfo(self, xml): ''' Extract the set of volume names from given gluster XML string ''' volumenames = [] volsxml = xml.find('volInfo/volumes') if volsxml == None: return volumenames for volxml in volsxml.findall('volume'): volname = volxml.find('name').text volumenames.append(volname) volume = GlusterVolume() volume.distCount = int(volxml.find('distCount').text) volume.stripeCount = int(volxml.find('stripeCount').text) volume.replicaCount = int(volxml.find('replicaCount').text) volume.fopHitsEnabled = 0 volume.latencyEnabled = 0 for option in volxml.findall('options/option'): name = option.find('name').text value = option.find('value').text if (name == 'diagnostics.count-fop-hits' and value == 'on'): volume.fopHitsEnabled = 1 if (name == 'diagnostics.latency-measurement' and value == 'on'): volume.latencyEnabled = 1 self.volumes[volname] = volume # prepare the volume indom cache return volumenames def parseVolumeProfileInfo(self, volume, xml): ''' Extract the metric values from a given gluster profile string ''' for brickxml in xml.findall('volProfile/brick'): brickname = brickxml.find('brickName').text brick = GlusterBrick() for fileop in brickxml.findall('cumulativeStats/fopStats/fop'): name = fileop.find('name').text try: fop = FILEOPS_INDICES[name] except KeyError: # self.log('Unrecognised fileops key %s' % name) pass else: brick.count[fop] = long(fileop.find('hits').text) brick.avgtime[fop] = float(fileop.find('avgLatency').text) brick.mintime[fop] = long(float(fileop.find('minLatency').text)) brick.maxtime[fop] = long(float(fileop.find('maxLatency').text)) brick.read_bytes = long(brickxml.find('cumulativeStats/totalRead').text) brick.write_bytes = long(brickxml.find('cumulativeStats/totalWrite').text) self.bricks[brickname] = brick # prepare the bricks indom cache def gluster_refresh(self): ''' Refresh the values and instances for gluster volumes and bricks ''' xml = self.runVolumeInfo() if (xml != None): for vol in self.parseVolumeInfo(xml): xml = self.runVolumeProfileInfo(vol) if (xml != None): self.parseVolumeProfileInfo(vol, xml) def gluster_instance(self, serial): ''' Called once per "instance request" PDU ''' if (serial == 0 or serial == 1): self.gluster_fetch() def gluster_fetch(self): ''' Called once per "fetch" PDU ''' self.bricks.clear() self.volumes.clear() self.gluster_refresh() self.replace_indom(self.brick_indom, self.bricks) self.replace_indom(self.volume_indom, self.volumes) def gluster_fetch_thruput_callback(self, item, inst): ''' Returns a list of value,status (single pair) for thruput cluster Helper for the fetch callback ''' if (item < 0 or item > 2): return [c_api.PM_ERR_PMID, 0] voidp = self.inst_lookup(self.brick_indom, inst) if (voidp == None): return [c_api.PM_ERR_INST, 0] cache = cast(voidp, POINTER(GlusterBrick)) brick = cache.contents if (item == 0): return [brick.read_bytes, 1] return [brick.write_bytes, 1] def gluster_fetch_latency_callback(self, item, inst): ''' Returns a list of value,status (single pair) for latency cluster Helper for the fetch callback ''' if (item < 0 or item > len(FILEOPS) * 4): return [c_api.PM_ERR_PMID, 0] voidp = self.inst_lookup(self.brick_indom, inst) if (voidp == None): return [c_api.PM_ERR_INST, 0] cache = cast(voidp, POINTER(GlusterBrick)) brick = cache.contents fileop = item / 4 index = item % 4 if (index == 0): return [brick.mintime[fileop], 1] elif (index == 1): return [brick.maxtime[fileop], 1] elif (index == 2): return [brick.avgtime[fileop], 1] return [brick.count[fileop], 1] def gluster_fetch_volumes_callback(self, item, inst): ''' Returns a list of value,status (single pair) for volumes cluster Helper for the fetch callback ''' if (item < 0 or item > 3): return [c_api.PM_ERR_PMID, 0] voidp = self.inst_lookup(self.volume_indom, inst) if (voidp == None): return [c_api.PM_ERR_INST, 0] cached = cast(voidp, POINTER(GlusterVolume)) volume = cached.contents if (item == 0): enabled = volume.fopHitsEnabled if (enabled): enabled = volume.latencyEnabled return [enabled, 1] elif (item == 1): return [volume.distCount, 1] elif (item == 2): return [volume.stripeCount, 1] elif (item == 3): return [volume.replicaCount, 1] return [c_api.PM_ERR_PMID, 0] def gluster_fetch_callback(self, cluster, item, inst): ''' Main fetch callback, defers to helpers for each cluster. Returns a list of value,status (single pair) for requested pmid/inst ''' # self.log("fetch callback for %d.%d[%d]" % (cluster, item, inst)) if (cluster == 0): return self.gluster_fetch_thruput_callback(item, inst) elif (cluster == 1): return self.gluster_fetch_latency_callback(item, inst) elif (cluster == 2): return self.gluster_fetch_volumes_callback(item, inst) return [c_api.PM_ERR_PMID, 0] def gluster_store_volume_callback(self, inst, val): ''' Helper for the store callback, volume profile enabling/disabling ''' sts = 0 name = self.inst_name_lookup(self.volume_indom, inst) if (name == None): sts = c_api.PM_ERR_INST elif (val < 0): sts = c_api.PM_ERR_SIGN elif (val == 0): command = self.vol_stop % name # self.log("Running disable command: %s" % command) subprocess.call(command, shell =True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) else: command = self.vol_start % name # self.log("Running enable command: %s" % command) subprocess.call(command, shell =True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) return sts def gluster_store_callback(self, cluster, item, inst, val): ''' Store callback, executed when a request to write to a metric happens Defers to helpers for each storable metric. Returns a single value. ''' # self.log("store callback for %d.%d[%d]" % (cluster, item, inst)) if (cluster == 2 and item == 0): return self.gluster_store_volume_callback(inst, val) elif (cluster == 0 or cluster == 1): return c_api.PM_ERR_PERMISSION return c_api.PM_ERR_PMID def __init__(self, name, domain): PMDA.__init__(self, name, domain) self.vol_info = getenv('GLUSTER_VOL_INFO', VOL_INFO_COMMAND) self.vol_stop = getenv('GLUSTER_VOL_STOP', VOL_STOP_COMMAND) self.vol_start = getenv('GLUSTER_VOL_START', VOL_START_COMMAND) self.vol_stats = getenv('GLUSTER_VOL_STATS', VOL_STATS_COMMAND) self.volume_indom = self.indom(0) self.add_indom(pmdaIndom(self.volume_indom, self.volumes)) self.brick_indom = self.indom(1) self.add_indom(pmdaIndom(self.brick_indom, self.bricks)) self.add_metric(name + '.brick.read_bytes', pmdaMetric(self.pmid(0, 0), c_api.PM_TYPE_U64, self.brick_indom, c_api.PM_SEM_COUNTER, pmUnits(1, 0, 0, c_api.PM_SPACE_BYTE, 0, 0))) self.add_metric(name + '.brick.write_bytes', pmdaMetric(self.pmid(0, 1), c_api.PM_TYPE_U64, self.brick_indom, c_api.PM_SEM_COUNTER, pmUnits(1, 0, 0, c_api.PM_SPACE_BYTE, 0, 0))) item = 0 for fileop in FILEOPS: metricname = name + '.brick.latency.' + fileop.lower() + '.' self.add_metric(metricname + 'min', pmdaMetric(self.pmid(1, item), c_api.PM_TYPE_U64, self.brick_indom, c_api.PM_SEM_INSTANT, pmUnits(0, 1, 0, 0, c_api.PM_TIME_USEC, 0))) item += 1 self.add_metric(metricname + 'max', pmdaMetric(self.pmid(1, item), c_api.PM_TYPE_U64, self.brick_indom, c_api.PM_SEM_INSTANT, pmUnits(0, 1, 0, 0, c_api.PM_TIME_USEC, 0))) item += 1 self.add_metric(metricname + 'avg', pmdaMetric(self.pmid(1, item), c_api.PM_TYPE_FLOAT, self.brick_indom, c_api.PM_SEM_INSTANT, pmUnits(0, 1, 0, 0, c_api.PM_TIME_USEC, 0))) item += 1 self.add_metric(metricname + 'count', pmdaMetric(self.pmid(1, item), c_api.PM_TYPE_U64, self.brick_indom, c_api.PM_SEM_COUNTER, pmUnits(0, 0, 1, 0, 0, c_api.PM_COUNT_ONE))) item += 1 metricname = name + '.volume.' self.add_metric(metricname + 'profile', pmdaMetric(self.pmid(2, 0), c_api.PM_TYPE_32, self.volume_indom, c_api.PM_SEM_INSTANT, pmUnits(0, 0, 0, 0, 0, 0))) self.add_metric(metricname + 'dist.count', pmdaMetric(self.pmid(2, 1), c_api.PM_TYPE_32, self.volume_indom, c_api.PM_SEM_INSTANT, pmUnits(0, 0, 0, 0, 0, 0))) self.add_metric(metricname + 'stripe.count', pmdaMetric(self.pmid(2, 2), c_api.PM_TYPE_32, self.volume_indom, c_api.PM_SEM_INSTANT, pmUnits(0, 0, 0, 0, 0, 0))) self.add_metric(metricname + 'replica.count', pmdaMetric(self.pmid(2, 3), c_api.PM_TYPE_32, self.volume_indom, c_api.PM_SEM_INSTANT, pmUnits(0, 0, 0, 0, 0, 0))) self.set_fetch(self.gluster_fetch) self.set_instance(self.gluster_instance) self.set_fetch_callback(self.gluster_fetch_callback) self.set_store_callback(self.gluster_store_callback) if __name__ == '__main__': for index in range(0, len(FILEOPS)): fileop = FILEOPS[index] FILEOPS_INDICES[fileop] = index GlusterPMDA('gluster', 118).run()