diff --git a/collector.py b/collector.py index 3e91d63..d994f71 100644 --- a/collector.py +++ b/collector.py @@ -109,6 +109,8 @@ class WekaCollector(object): 'WRITE_LATENCY': 'microsecs' } + RPC_STATS = {} # populated from config['rpc_stats'] in __init__ + weka_stat_list = {} # category: {{ stat:unit}, {stat:unit}} def __init__(self, config, cluster_obj): # wekaCollector @@ -134,7 +136,9 @@ def __init__(self, config, cluster_obj): # wekaCollector self.cluster = cluster_obj self.wekadata = dict() self.fs_io_stats = dict() + self.rpc_stats = dict() self.fs_map = dict() + self.RPC_STATS = config.get('rpc_stats') or {} global weka_stat_list @@ -153,6 +157,7 @@ def __init__(self, config, cluster_obj): # wekaCollector weka_stat_list[category].update({stat: unit}) one_call_stat.append(f"{category}.{stat}") + # log.debug(f"one_call_stat={len(one_call_stat)}") # log.debug(f"weka_stat_list={weka_stat_list}") @@ -217,6 +222,9 @@ def _reset_metrics(self): 'WekaFS statistics. For more info refer to: https://docs.weka.io/usage/statistics/list-of-statistics', labels=['cluster', 'host_name', 'host_role', 'node_id', 'node_role', 'category', 'stat', 'unit']) + metric_objs['weka_rpc_stats'] = GaugeMetricFamily('weka_rpc_stats', + 'Per-RPC-method statistics', + labels=['cluster', 'stat', 'method', 'unit']) metric_objs['weka_io_histogram'] = GaugeHistogramMetricFamily("weka_blocksize", "weka blocksize distribution histogram", labels=['cluster', 'host_name', 'host_role', @@ -440,6 +448,19 @@ def get_fs_io_stats(self): except Exception as exc: log.error(f"error getting fs_io_stat, {stat} from cluster {self.cluster}: {exc}") + def get_rpc_stats(self): + self.rpc_stats = dict() + for stat, unit in self.RPC_STATS.items(): + try: + result = self.cluster.call_api('stats_show', parms={ + 'category': 'rpc', 'stat': stat, 'interval': '1m', + 'param': {'method': '*'}, 'per_process': True, 'show_internal': True}) + if result is not None and 'all' in result: + if 'rpc' in result['all'] and len(result['all']['rpc']) > 0: + if 'stats' in result['all']['rpc'][0]: + self.rpc_stats[stat] = {'data': result['all']['rpc'][0]['stats'], 'unit': unit} + except Exception as exc: + log.error(f"error getting rpc_stat {stat} from cluster {self.cluster}: {exc}") # # gather() gets fresh stats from the cluster as they update @@ -560,7 +581,6 @@ def gather(self): stats_data = list() for result in self.asyncobj.wait(): if not result.exception: - # log.info(f"result={result}") stats_data.append(result.result) else: log.error(f'result has exception {result.exception}') @@ -713,6 +733,7 @@ def gather(self): log.error("error processing filesystem stats for cluster {}".format(str(self.cluster))) self.get_fs_io_stats() # populate self.fs_io_stats + self.get_rpc_stats() # populate self.rpc_stats #metric_objs['weka_fs_stats'] = GaugeMetricFamily('weka_fs_stats', 'Per-Filesystem IO Stats', # labels=['cluster', 'name', 'fs_id', 'fs_name', 'stat', 'unit']) #labelvalues = [str(self.cluster), alert['type'], severity, alert['title'], host_name, host_id, node_id, @@ -731,6 +752,17 @@ def gather(self): except Exception as exc: log.error(f"Error in processing per-fs stats: {exc}") + try: + for stat_name, stat_info in self.rpc_stats.items(): + for key, value in stat_info['data'].items(): + if value is not None: + # key is like "RPC_ROUNDTRIP_AVG[method: writeWithChecksums]" — extract method name + method = key.split('[method: ')[-1].rstrip(']') if '[method: ' in key else key + metric_objs['weka_rpc_stats'].add_metric( + [str(self.cluster), stat_name, method, stat_info['unit']], value) + except Exception as exc: + log.error(f"Error in processing rpc stats: {exc}") + log.debug(f"alerts cluster={str(self.cluster)}") try: for alert in self.wekadata["alerts"]: diff --git a/export.yml b/export.yml index bfd709a..a076e7d 100644 --- a/export.yml +++ b/export.yml @@ -278,3 +278,19 @@ stats: # SSD_WRITE_LATENCY: microsecs # network: # PUMPS_TXQ_FULL: times + +# RPC stats: collected per-process with --param="method:*", broken out by RPC method name. +# Results appear as the 'weka_rpc_stats' metric with a 'method' label. +# +# Equivalent CLI command: +# weka stats --show-internal --stat RPC_ROUNDTRIP_AVG --interval 60 --param="method:*" -s value --per-process +# +# Example Prometheus query: +# weka_rpc_stats{stat="RPC_ROUNDTRIP_AVG", method="writeWithChecksums"} +# +# Format: : + +#rpc_stats: +# RPC_ROUNDTRIP_AVG: microsecs +# RPC_RATE: count +# unit: count