@@ -21,7 +21,7 @@ def execute_sql(conn, stmt, params):
2121
2222
2323class AllocatedRscGauge :
24- def __init__ (self , name ):
24+ def __init__ (self , name , conn ):
2525 self ._gauge = Gauge (name , "A gauge of allocated ressources of running jobs over time" , ['cluster' , 'rsc' , 'project' ])
2626 self ._request_per_cluster = "SELECT foo.cluster_name, (SELECT name FROM project WHERE id= foo.project_id), " \
2727 "foo.mem, foo.cpu FROM (SELECT cluster_name, project_id, sum(memory) as mem, " \
@@ -33,27 +33,24 @@ def __init__(self, name):
3333 "FROM job " \
3434 "WHERE state='running' GROUP BY project_id) as foo"
3535
36- self ._request_possible_cluster = "SELECT DISTINCT name FROM cluster"
37- self ._request_possible_projects = "SELECT DISTINCT name FROM project"
36+ self ._request_possible_cluster = "SELECT DISTINCT name FROM cluster ORDER BY name "
37+ self ._request_possible_projects = "SELECT DISTINCT name FROM project ORDER BY name "
3838 self ._possible_combination = None
3939 self ._possible_cluster = None
4040 self ._possible_project = None
41- self ._count_to_request = 0
41+ self ._count_to_update = 10
42+ self ._create_combination_dict (conn )
4243
43- def update (self , conn ):
44- per_cluster = execute_sql (conn , self ._request_per_cluster , None )
45- total = execute_sql (conn , self ._request_total , None )
46- self ._reset_possible_combination (conn )
47- self ._set_values (per_cluster , total )
44+ def _create_combination_dict (self , conn ):
45+ """
46+ Completely rewrite the combination dict based on the occurrences of the request results without any
47+ data loss checks made on old cluster or project occurrences.
48+ """
49+ self ._possible_cluster = execute_sql (conn , self ._request_possible_cluster , None )
50+ self ._possible_cluster .append (["'%'" ])
51+ self ._possible_project = execute_sql (conn , self ._request_possible_projects , None )
4852
49- def _reset_possible_combination (self , conn ):
5053 self ._possible_combination = dict ()
51- if self ._count_to_request == 0 :
52- self ._possible_cluster = execute_sql (conn , self ._request_possible_cluster , None )
53- self ._possible_cluster .append (["'%'" ])
54- self ._possible_project = execute_sql (conn , self ._request_possible_projects , None )
55- self ._count_to_request = 10
56-
5754 for cluster in self ._possible_cluster :
5855 if cluster [0 ]:
5956 project_dict = dict ()
@@ -62,7 +59,65 @@ def _reset_possible_combination(self, conn):
6259 project_dict [project [0 ]] = True
6360 self ._possible_combination [cluster [0 ]] = project_dict
6461
65- self ._count_to_request -= 1
62+ def _update_combination_dict (self , conn ):
63+ """
64+ Check if the dict must be updated with new values and updates it if necessary.
65+ To do so we overwrite old values as well as the new ones => check before.
66+
67+ IT DOES NOT set old values to True (the existence test among list is not efficient),
68+ we still need to reset all the values by calling _reset_combination_dict
69+ """
70+ tmp_possible_cluster = execute_sql (conn , self ._request_possible_cluster , None )
71+ tmp_possible_cluster .append (["'%'" ])
72+ tmp_possible_project = execute_sql (conn , self ._request_possible_projects , None )
73+
74+ if AllocatedRscGauge ._different_instance_list (self ._possible_cluster , tmp_possible_cluster ) or \
75+ AllocatedRscGauge ._different_instance_list (self ._possible_project , tmp_possible_project ):
76+ print ("=== DICT UPDATE - AVOIDABLE COST ===" )
77+ for cluster in self ._possible_cluster :
78+ if cluster [0 ]:
79+ project_dict = self ._possible_combination .get (cluster [0 ])
80+ if not project_dict :
81+ project_dict = dict ()
82+ self ._possible_combination [cluster [0 ]] = project_dict
83+ for project in self ._possible_project :
84+ if project [0 ]:
85+ project_dict [project [0 ]] = True
86+
87+ def _reset_combination_dict (self ):
88+ """
89+ Set all the occurrences of the combination dict to True in order to detect which values to set to 0
90+ because they are missing in the request result
91+ """
92+ for project_dict in self ._possible_combination .values ():
93+ for project_name in project_dict .keys ():
94+ project_dict [project_name ] = True
95+
96+ @staticmethod
97+ def _different_instance_list (first , second ):
98+ """
99+ Return True if the given contains a difference in there occurrences.
100+ Used to detect if we should update the combination dictionary.
101+ Example : a new project or a new cluster appeared in the DB
102+ """
103+ length = len (first )
104+ if length != len (second ):
105+ return True
106+ else :
107+ for i in range (length ):
108+ if first [i ][0 ] != second [i ][0 ]:
109+ return True
110+ return False
111+
112+ def update (self , conn ):
113+ per_cluster = execute_sql (conn , self ._request_per_cluster , None )
114+ total = execute_sql (conn , self ._request_total , None )
115+ if self ._count_to_update == 0 :
116+ self ._update_combination_dict (conn )
117+ self ._count_to_update = 10
118+ self ._reset_combination_dict ()
119+ self ._set_values (per_cluster , total )
120+ self ._count_to_update -= 1
66121
67122 def _set_values (self , per_cluster , total ):
68123 for row in per_cluster :
@@ -82,11 +137,18 @@ def _set_values(self, per_cluster, total):
82137 self ._gauge .labels (rsc = "cpu" , cluster = "'%'" , project = row [0 ]).set (row [2 ])
83138
84139 for cluster , project_dict in self ._possible_combination .items ():
140+ to_delete = []
85141 for project , not_used in project_dict .items ():
86142 if not_used :
143+ to_delete .append (project )
87144 self ._gauge .labels (rsc = "mem" , cluster = cluster , project = project ).set (0 )
88145 self ._gauge .labels (rsc = "cpu" , cluster = cluster , project = project ).set (0 )
89146
147+ # Maybe it was the last occurrence ever of this datas. We delete it in order to keep the combination dict
148+ # as small as possible (would be running for entire weeks maybe)
149+ for maybe_last_occurrence in to_delete :
150+ del project_dict [maybe_last_occurrence ]
151+
90152
91153class AllJobNodeGauge :
92154 def __init__ (self , name ):
@@ -216,7 +278,7 @@ def main():
216278 return 1
217279
218280 active_job_gauge = ActiveJobClusterGauge ('job_active_current_count' )
219- rsc_gauge = AllocatedRscGauge ('rsc_current_count' )
281+ rsc_gauge = AllocatedRscGauge ('rsc_current_count' , conn )
220282 all_job_gauge = AllJobNodeGauge ('job_all_node_count' )
221283
222284 while running :
0 commit comments