Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class WekaCollector(object):
'WRITE_LATENCY': 'microsecs'
}

RPC_STATS = {} # populated from config['rpc_stats'] in __init__

weka_stat_list = {} # category: {{ stat:unit}, {stat:unit}}

def __init__(self, config, cluster_obj): # wekaCollector
Expand All @@ -134,7 +136,9 @@ def __init__(self, config, cluster_obj): # wekaCollector
self.cluster = cluster_obj
self.wekadata = dict()
self.fs_io_stats = dict()
self.rpc_stats = dict()
self.fs_map = dict()
self.RPC_STATS = config.get('rpc_stats') or {}

global weka_stat_list

Expand All @@ -153,6 +157,7 @@ def __init__(self, config, cluster_obj): # wekaCollector
weka_stat_list[category].update({stat: unit})
one_call_stat.append(f"{category}.{stat}")


# log.debug(f"one_call_stat={len(one_call_stat)}")

# log.debug(f"weka_stat_list={weka_stat_list}")
Expand Down Expand Up @@ -217,6 +222,9 @@ def _reset_metrics(self):
'WekaFS statistics. For more info refer to: https://docs.weka.io/usage/statistics/list-of-statistics',
labels=['cluster', 'host_name', 'host_role', 'node_id',
'node_role', 'category', 'stat', 'unit'])
metric_objs['weka_rpc_stats'] = GaugeMetricFamily('weka_rpc_stats',
'Per-RPC-method statistics',
labels=['cluster', 'stat', 'method', 'unit'])
metric_objs['weka_io_histogram'] = GaugeHistogramMetricFamily("weka_blocksize",
"weka blocksize distribution histogram",
labels=['cluster', 'host_name', 'host_role',
Expand Down Expand Up @@ -440,6 +448,19 @@ def get_fs_io_stats(self):
except Exception as exc:
log.error(f"error getting fs_io_stat, {stat} from cluster {self.cluster}: {exc}")

def get_rpc_stats(self):
self.rpc_stats = dict()
for stat, unit in self.RPC_STATS.items():
try:
result = self.cluster.call_api('stats_show', parms={
'category': 'rpc', 'stat': stat, 'interval': '1m',
'param': {'method': '*'}, 'per_process': True, 'show_internal': True})
if result is not None and 'all' in result:
if 'rpc' in result['all'] and len(result['all']['rpc']) > 0:
if 'stats' in result['all']['rpc'][0]:
self.rpc_stats[stat] = {'data': result['all']['rpc'][0]['stats'], 'unit': unit}
except Exception as exc:
log.error(f"error getting rpc_stat {stat} from cluster {self.cluster}: {exc}")

#
# gather() gets fresh stats from the cluster as they update
Expand Down Expand Up @@ -560,7 +581,6 @@ def gather(self):
stats_data = list()
for result in self.asyncobj.wait():
if not result.exception:
# log.info(f"result={result}")
stats_data.append(result.result)
else:
log.error(f'result has exception {result.exception}')
Expand Down Expand Up @@ -713,6 +733,7 @@ def gather(self):
log.error("error processing filesystem stats for cluster {}".format(str(self.cluster)))

self.get_fs_io_stats() # populate self.fs_io_stats
self.get_rpc_stats() # populate self.rpc_stats
#metric_objs['weka_fs_stats'] = GaugeMetricFamily('weka_fs_stats', 'Per-Filesystem IO Stats',
# labels=['cluster', 'name', 'fs_id', 'fs_name', 'stat', 'unit'])
#labelvalues = [str(self.cluster), alert['type'], severity, alert['title'], host_name, host_id, node_id,
Expand All @@ -731,6 +752,17 @@ def gather(self):
except Exception as exc:
log.error(f"Error in processing per-fs stats: {exc}")

try:
for stat_name, stat_info in self.rpc_stats.items():
for key, value in stat_info['data'].items():
if value is not None:
# key is like "RPC_ROUNDTRIP_AVG[method: writeWithChecksums]" — extract method name
method = key.split('[method: ')[-1].rstrip(']') if '[method: ' in key else key
metric_objs['weka_rpc_stats'].add_metric(
[str(self.cluster), stat_name, method, stat_info['unit']], value)
except Exception as exc:
log.error(f"Error in processing rpc stats: {exc}")

log.debug(f"alerts cluster={str(self.cluster)}")
try:
for alert in self.wekadata["alerts"]:
Expand Down
16 changes: 16 additions & 0 deletions export.yml
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,19 @@ stats:
# SSD_WRITE_LATENCY: microsecs
# network:
# PUMPS_TXQ_FULL: times

# RPC stats: collected per-process with --param="method:*", broken out by RPC method name.
# Results appear as the 'weka_rpc_stats' metric with a 'method' label.
#
# Equivalent CLI command:
# weka stats --show-internal --stat RPC_ROUNDTRIP_AVG --interval 60 --param="method:*" -s value --per-process
#
# Example Prometheus query:
# weka_rpc_stats{stat="RPC_ROUNDTRIP_AVG", method="writeWithChecksums"}
#
# Format: <STAT_NAME>: <unit>

#rpc_stats:
# RPC_ROUNDTRIP_AVG: microsecs
# RPC_RATE: count
# unit: count