Mercurial > sci
annotate sciworkerd/sciworkerd.c @ 24:34cbbd215ef7
misc: add basic support for jobresults
author | David Demelier <markand@malikania.fr> |
---|---|
date | Mon, 25 Jul 2022 21:11:23 +0200 |
parents | 2cb228f23f53 |
children | dae2de19ca5d |
rev | line source |
---|---|
19 | 1 #include <errno.h> |
2 #include <poll.h> | |
3 #include <signal.h> | |
4 #include <string.h> | |
5 #include <time.h> | |
6 | |
7 #include <utlist.h> | |
8 | |
9 #include "apic.h" | |
10 #include "log.h" | |
11 #include "sciworkerd.h" | |
12 #include "task.h" | |
13 #include "types.h" | |
14 #include "util.h" | |
15 | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
16 #define TAG "sciworkerd: " |
19 | 17 |
18 struct taskentry { | |
19 struct task *task; | |
20 struct job job; | |
21 struct taskentry *next; | |
22 }; | |
23 | |
24 static struct taskentry *taskpending; | |
25 static struct taskentry *tasks; | |
26 static struct taskentry *taskfinished; | |
27 static struct worker worker; | |
28 static int run = 1; | |
29 | |
30 struct sciworkerd sciworkerd = { | |
31 .fetchinterval = 300, | |
32 .maxjobs = 4, | |
33 .timeout = 600 | |
34 }; | |
35 | |
36 static inline void | |
37 taskentry_free(struct taskentry *entry) | |
38 { | |
39 if (task_status(entry->task) == TASKSTATUS_RUNNING) { | |
40 if (task_kill(entry->task) == 0) | |
41 task_wait(entry->task); | |
42 } | |
43 | |
44 task_free(entry->task); | |
45 free(entry); | |
46 } | |
47 | |
48 static void | |
49 stop(int sign) | |
50 { | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
51 log_info(TAG "exiting on signal %d", sign); |
19 | 52 run = 0; |
53 } | |
54 | |
55 static inline int | |
56 pending(int id) | |
57 { | |
58 const struct taskentry *iter; | |
59 | |
60 LL_FOREACH(taskpending, iter) | |
61 if (iter->job.id == id) | |
62 return 1; | |
63 | |
64 return 0; | |
65 } | |
66 | |
67 static inline void | |
68 queue(const struct job *job) | |
69 { | |
70 struct taskentry *tk; | |
71 | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
72 log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag); |
19 | 73 |
74 tk = util_calloc(1, sizeof (*tk)); | |
75 tk->task = task_new(job->tag); | |
76 memcpy(&tk->job, job, sizeof (*job)); | |
77 LL_APPEND(taskpending, tk); | |
78 } | |
79 | |
80 static void | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
81 merge(struct job *jobs, size_t jobsz) |
19 | 82 { |
83 size_t total = 0; | |
84 | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
85 for (size_t i = 0; i < jobsz; ++i) { |
20 | 86 if (!pending(jobs[i].id)) { |
87 queue(&jobs[i]); | |
88 total++; | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
89 } else |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
90 job_finish(&jobs[i]); |
20 | 91 } |
19 | 92 |
20 | 93 log_info(TAG "added %zu new pending tasks", total); |
19 | 94 } |
95 | |
96 /* | |
97 * Fetch jobs periodically, depending on the user setting. | |
98 */ | |
99 static void | |
100 fetch_jobs(void) | |
101 { | |
102 static time_t startup; | |
103 time_t now; | |
20 | 104 struct apic req; |
105 struct job todo[SCI_JOB_LIST_MAX]; | |
106 ssize_t todosz; | |
19 | 107 |
108 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { | |
109 startup = now; | |
110 | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
111 log_info(TAG "fetching jobs"); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
112 |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0) |
19 | 114 log_warn(TAG "unable to fetch jobs: %s", req.error); |
20 | 115 else |
116 merge(todo, todosz); | |
117 | |
118 apic_finish(&req); | |
19 | 119 } |
120 } | |
121 | |
122 /* | |
123 * Fetch information about myself. | |
124 */ | |
125 static void | |
126 fetch_worker(void) | |
127 { | |
20 | 128 struct apic req; |
19 | 129 |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
131 log_die(TAG "unable to fetch worker info: %s", req.error); |
20 | 132 |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
133 log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc); |
20 | 134 apic_finish(&req); |
19 | 135 } |
136 | |
137 static inline size_t | |
138 count(const struct taskentry *head) | |
139 { | |
140 const struct taskentry *iter; | |
141 size_t tot = 0; | |
142 | |
143 LL_FOREACH(head, iter) | |
144 tot++; | |
145 | |
146 return tot; | |
147 } | |
148 | |
149 /* | |
150 * Start a task. We fetch its script code and then create the task with that | |
151 * script. | |
152 */ | |
153 static int | |
154 start(struct taskentry *entry) | |
155 { | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
156 struct apic req; |
19 | 157 struct project project; |
158 pid_t pid; | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
159 int ret = -1; |
19 | 160 |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
161 if (apic_project_find(&req, &project, entry->job.project_name) < 0) |
19 | 162 return log_warn(TAG "unable to fetch project, dropping task"), -1; |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
163 |
19 | 164 if (task_setup(entry->task, project.script) < 0) |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
165 log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
166 else if ((pid = task_start(entry->task)) < 0) |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
167 log_warn(TAG "unable to spawn task process: %s", strerror(errno)); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
168 else { |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
169 log_info(TAG "task %lld spawned", (long long int)pid); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
170 ret = 0; |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
171 } |
19 | 172 |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
173 project_finish(&project); |
19 | 174 |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
175 return ret; |
19 | 176 } |
177 | |
178 static inline void | |
179 delete(struct taskentry *entry) | |
180 { | |
181 LL_DELETE(taskpending, entry); | |
182 task_free(entry->task); | |
183 free(entry); | |
184 } | |
185 | |
186 static void | |
187 start_all(void) | |
188 { | |
189 size_t running = count(tasks); | |
190 struct taskentry *entry; | |
191 | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
192 while (running < sciworkerd.maxjobs && (entry = taskpending)) { |
19 | 193 if (start(entry) < 0) |
194 delete(entry); | |
195 else { | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
196 running++; |
19 | 197 LL_DELETE(taskpending, entry); |
198 LL_APPEND(tasks, entry); | |
199 } | |
200 } | |
201 } | |
202 | |
203 static void | |
204 process_all(void) | |
205 { | |
206 struct taskentry *iter, *next; | |
207 struct taskcode code; | |
208 struct pollfd *fds; | |
209 size_t fdsz, i = 0; | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
210 pid_t pid; |
19 | 211 int ret; |
212 | |
213 /* First, read every pipes. */ | |
214 if (!(fdsz = count(tasks))) | |
215 return; | |
216 | |
217 fds = util_calloc(fdsz, sizeof (*fds)); | |
218 | |
219 for (iter = tasks; iter; iter = iter->next) | |
220 task_prepare(iter->task, &fds[i++]); | |
221 | |
222 if (poll(fds, fdsz, 5000) < 0) | |
223 log_warn("poll: %s", strerror(errno)); | |
224 | |
225 for (iter = tasks, i = 0; i < fdsz; ++i) { | |
226 next = iter->next; | |
227 | |
228 /* | |
229 * 0: EOF [wait] | |
230 * -1: error [kill + wait] | |
231 * >0: keep going [nothing] | |
232 */ | |
233 if ((ret = task_sync(iter->task, &fds[i])) < 0) { | |
234 log_warn(TAG "pipe error: %s, killing task", strerror(errno)); | |
235 | |
236 if (task_kill(iter->task) < 0) | |
237 log_warn(TAG "task kill error: %s", strerror(errno)); | |
238 } | |
239 | |
240 /* Now wait for the task to complete. */ | |
241 if (ret <= 0) { | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
242 pid = task_pid(iter->task); |
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
243 |
19 | 244 if (task_wait(iter->task) < 0) |
245 log_warn(TAG "task wait error: %s", strerror(errno)); | |
246 else { | |
247 code = task_code(iter->task); | |
248 | |
249 switch (task_status(iter->task)) { | |
250 case TASKSTATUS_EXITED: | |
251 log_info(TAG "task %lld exited with code %d", | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
252 (long long int)pid, code.exitcode); |
19 | 253 break; |
254 case TASKSTATUS_KILLED: | |
255 log_info(TAG "task %lld killed with signal %d", | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
256 (long long int)pid, code.sigcode); |
19 | 257 break; |
258 default: | |
259 break; | |
260 } | |
261 } | |
262 | |
263 /* Remove that task and push to the outgoing queue. */ | |
264 LL_DELETE(tasks, iter); | |
265 LL_APPEND(taskfinished, iter); | |
266 } | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
267 |
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
268 iter = next; |
19 | 269 } |
270 | |
271 free(fds); | |
272 } | |
273 | |
274 /* | |
275 * Kill all tasks that have been running for too long. | |
276 */ | |
277 static void | |
278 ghost_all(void) | |
279 { | |
280 struct taskentry *iter, *tmp; | |
281 | |
282 LL_FOREACH_SAFE(tasks, iter, tmp) { | |
283 if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) | |
284 continue; | |
285 | |
286 /* Do not attempt to wait if kill failed to avoid lock. */ | |
287 log_info(TAG "task timeout, killing"); | |
288 | |
289 if (task_kill(iter->task) == 0) | |
290 task_wait(iter->task); | |
291 | |
292 LL_DELETE(tasks, iter); | |
293 LL_APPEND(taskfinished, iter); | |
294 } | |
295 } | |
296 | |
297 static int | |
298 publish(struct taskentry *iter) | |
299 { | |
300 // TODO: add sigcode. | |
301 struct taskcode code = task_code(iter->task); | |
302 struct jobresult res = { | |
303 .job_id = iter->job.id, | |
304 .exitcode = code.exitcode, | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
305 .log = util_strdup(task_console(iter->task)), |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
306 .worker_name = worker.name |
19 | 307 }; |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
308 struct apic req; |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
309 int ret = 0; |
19 | 310 |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
311 if (apic_jobresult_add(&req, &res) < 0) { |
19 | 312 log_warn(TAG "unable to publish task: %s", req.error); |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
313 ret = -1; |
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
314 } |
19 | 315 |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
316 apic_finish(&req); |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
317 jobresult_finish(&res); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
318 |
19 | 319 return ret; |
320 } | |
321 | |
322 static void | |
323 publish_all(void) | |
324 { | |
325 struct taskentry *iter, *tmp; | |
326 | |
327 LL_FOREACH_SAFE(taskfinished, iter, tmp) { | |
328 if (publish(iter) == 0) { | |
329 LL_DELETE(taskfinished, iter); | |
330 taskentry_free(iter); | |
331 } | |
332 } | |
333 } | |
334 | |
335 void | |
336 sciworkerd_init(void) | |
337 { | |
338 struct sigaction sa = {0}; | |
339 | |
340 log_open("sigworkerd"); | |
341 | |
342 sigemptyset(&sa.sa_mask); | |
343 sa.sa_handler = stop; | |
344 | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
345 util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
346 |
19 | 347 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) |
348 log_die(TAG "sigaction: %s", strerror(errno)); | |
349 | |
350 fetch_worker(); | |
351 } | |
352 | |
353 void | |
354 sciworkerd_run(void) | |
355 { | |
356 while (run) { | |
357 fetch_jobs(); | |
24
34cbbd215ef7
misc: add basic support for jobresults
David Demelier <markand@malikania.fr>
parents:
23
diff
changeset
|
358 start_all(); |
19 | 359 process_all(); |
360 ghost_all(); | |
361 publish_all(); | |
362 } | |
363 } | |
364 | |
365 void | |
366 sciworkerd_finish(void) | |
367 { | |
368 struct taskentry *iter, *tmp; | |
369 | |
370 LL_FOREACH_SAFE(taskpending, iter, tmp) | |
371 taskentry_free(iter); | |
372 LL_FOREACH_SAFE(tasks, iter, tmp) | |
373 taskentry_free(iter); | |
374 LL_FOREACH_SAFE(taskfinished, iter, tmp) | |
375 taskentry_free(iter); | |
376 } |