Skip to content
This repository was archived by the owner on Mar 13, 2026. It is now read-only.

Commit 5977cb3

Browse files
authored
fix: improvements in monitor queued jobs (#19)
* change float to int * refactor: add class GithubJob and store into memory * add job to memory when in_progress, show queued job details * disable info messages for apscheduler * filter by queued jobs only * fix datetime timedelta * lint changes
1 parent 7a34c92 commit 5977cb3

2 files changed

Lines changed: 123 additions & 53 deletions

File tree

src/app.py

Lines changed: 46 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88

99

1010
from const import GithubHeaders, LOGGING_CONFIG
11-
from utils import parse_datetime, dict_to_logfmt
11+
from github import GithubJob
12+
from utils import dict_to_logfmt
1213

1314
dictConfig(LOGGING_CONFIG)
1415

@@ -22,6 +23,7 @@
2223
if hasattr(logging, loglevel_flask):
2324
loglevel_flask = getattr(logging, loglevel_flask)
2425
log.setLevel(loglevel_flask)
26+
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
2527

2628
jobs = dict()
2729

@@ -48,85 +50,73 @@ def validate_origin_github():
4850

4951

5052
def process_workflow_job():
51-
job = request.get_json()
52-
53-
job_id = job["workflow_job"]["id"]
54-
run_id = job["workflow_job"]["run_id"]
55-
job_name = job["workflow_job"]["name"].replace("\n", " ")
56-
workflow = job["workflow_job"]["workflow_name"]
57-
time_start = parse_datetime(job["workflow_job"]["started_at"])
58-
branch = job["workflow_job"].get("head_branch", "")
59-
repository = job["repository"]["full_name"]
60-
repository_private = job["repository"]["private"]
61-
action = job["action"]
62-
conclusion = job["workflow_job"].get("conclusion")
63-
requestor = job.get("sender", {}).get("login")
64-
runner_name = job["workflow_job"]["runner_name"]
65-
runner_group_name = job["workflow_job"]["runner_group_name"]
66-
runner_public = runner_group_name == "GitHub Actions"
53+
job = GithubJob(request.get_json())
6754

6855
context_details = {
69-
"action": action,
70-
"repository": repository,
71-
"branch": branch,
72-
"job_id": job_id,
73-
"run_id": run_id,
74-
"job_name": job_name,
75-
"workflow": workflow,
76-
"requestor": requestor,
56+
"action": job.action,
57+
"repository": job.repository,
58+
"branch": job.branch,
59+
"job_id": job.id,
60+
"run_id": job.run_id,
61+
"job_name": job.name,
62+
"workflow": job.workflow,
63+
"requestor": job.requestor,
7764
}
7865

79-
if action == "queued":
80-
# add to memory as timestamp
81-
jobs[job_id] = int(time_start.timestamp())
66+
if job.action == "queued":
67+
# add to memory
68+
jobs[job.id] = job
8269

83-
elif action == "in_progress":
84-
job_requested = jobs.get(job_id)
70+
elif job.action == "in_progress":
71+
job_requested = jobs.get(job.id)
8572
time_to_start = None
8673
if not job_requested:
87-
app.logger.warning(f"Job {job_id} is {action} but not stored!")
74+
app.logger.warning(f"Job {job.id} is {job.action} but not stored!")
8875
else:
89-
if time_start < datetime.fromtimestamp(job_requested):
90-
app.logger.error(f"Job {job_id} was in progress before being queued")
91-
del jobs[job_id]
76+
if job.time_start < job_requested.time_start:
77+
app.logger.error(f"Job {job.id} was in progress before being queued")
78+
del jobs[job.id]
9279
else:
9380
time_to_start = (
94-
time_start - datetime.fromtimestamp(job_requested)
81+
job.time_start - job_requested.time_start
9582
).seconds
9683

9784
context_details = {
9885
**context_details,
99-
"runner_name": runner_name,
100-
"runner_public": runner_public,
101-
"repository_private": repository_private,
86+
"runner_name": job.runner_name,
87+
"runner_public": job.runner_public,
88+
"repository_private": job.repository_private,
10289
}
10390

10491
if time_to_start:
10592
context_details["time_to_start"] = time_to_start
10693

107-
elif action == "completed":
108-
job_requested = jobs.get(job_id)
94+
# update job from memory
95+
jobs[job.id] = job
96+
97+
elif job.action == "completed":
98+
job_requested = jobs.get(job.id)
10999
if not job_requested:
110-
app.logger.warning(f"Job {job_id} is {action} but not stored!")
100+
app.logger.warning(f"Job {job.id} is {job.action} but not stored!")
111101
time_to_finish = 0
112102
else:
113103
time_to_finish = (
114-
parse_datetime(job["workflow_job"]["completed_at"]) - time_start
104+
job.time_completed - job.time_start
115105
).seconds
116106
# delete from memory
117-
del jobs[job_id]
107+
del jobs[job.id]
118108

119109
context_details = {
120110
**context_details,
121-
"runner_name": runner_name,
111+
"runner_name": job.runner_name,
122112
"time_to_finish": time_to_finish,
123-
"conclusion": conclusion,
113+
"conclusion": job.conclusion,
124114
}
125115

126116
else:
127-
app.logger.warning(f"Unknown action {action}, removing from memory")
128-
if job_id in jobs:
129-
del jobs[job_id]
117+
app.logger.warning(f"Unknown action {job.action}, removing from memory")
118+
if job.id in jobs:
119+
del jobs[job.id]
130120
context_details = None
131121

132122
if context_details:
@@ -136,21 +126,24 @@ def process_workflow_job():
136126

137127
@scheduler.task('interval', id='monitor_queued', seconds=30)
138128
def monitor_queued_jobs():
139-
""" Return the job that has been queued and not starting for long time. """
129+
"""Return the job that has been queued and not starting for long time."""
140130
app.logger.debug("Starting monitor_queued_jobs")
141131
if not jobs:
142132
return
143133

144-
job_id, time_start = min(jobs.items(), key=lambda x: x[1])
145-
delay = datetime.now().timestamp() - time_start
134+
queued_jobs = [job for job in jobs if job.action == "queued"]
135+
job = min(queued_jobs, key=lambda x: x.time_start)
136+
delay = (datetime.now() - job.time_start).seconds
146137

147138
if delay <= int(os.getenv("QUEUED_JOBS_DELAY_THRESHOLD", 150)):
148139
return
149140

150141
context_details = {
151142
"action": "monitor_queued",
152-
"job_id": job_id,
153-
"started_at": time_start,
143+
"job_id": job.id,
144+
"job_name": job.name,
145+
"repository": job.repository,
146+
"started_at": job.time_start,
154147
"delay": delay,
155148
}
156149

src/github.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from utils import parse_datetime
2+
3+
4+
class GithubJob():
5+
def __init__(self, json_body: str):
6+
self.data = json_body
7+
8+
@property
9+
def id(self):
10+
return self.data["workflow_job"]["id"]
11+
12+
@property
13+
def job_id(self):
14+
return self.id
15+
16+
@property
17+
def run_id(self):
18+
return self.data["workflow_job"]["run_id"]
19+
20+
@property
21+
def name(self):
22+
return self.data["workflow_job"]["name"].replace("\n", " ")
23+
24+
@property
25+
def job_name(self):
26+
return self.name
27+
28+
@property
29+
def workflow(self):
30+
return self.data["workflow_job"]["workflow_name"]
31+
32+
@property
33+
def time_start(self):
34+
return parse_datetime(self.data["workflow_job"]["started_at"])
35+
36+
@property
37+
def time_completed(self):
38+
return parse_datetime(self.data["workflow_job"]["completed_at"])
39+
40+
@property
41+
def branch(self):
42+
return self.data["workflow_job"].get("head_branch", "")
43+
44+
@property
45+
def repository(self):
46+
return self.data["repository"]["full_name"]
47+
48+
@property
49+
def repository_private(self):
50+
return self.data["repository"]["private"]
51+
52+
@property
53+
def action(self):
54+
return self.data["action"]
55+
56+
@property
57+
def conclusion(self):
58+
return self.data["workflow_job"].get("conclusion")
59+
60+
@property
61+
def requestor(self):
62+
return self.data.get("sender", {}).get("login")
63+
64+
@property
65+
def runner_name(self):
66+
return self.data["workflow_job"]["runner_name"]
67+
68+
@property
69+
def runner_group_name(self):
70+
return self.data["workflow_job"]["runner_group_name"]
71+
72+
@property
73+
def runner_public(self):
74+
return self.runner_group_name == "GitHub Actions"
75+
76+
def __str__(self):
77+
return f"<{self.id}@{self.name}>"

0 commit comments

Comments
 (0)