Mercurial > sci
annotate sciworkerd/sciworkerd.c @ 23:2cb228f23f53
misc: rework todo/jobs HTTP requests
author | David Demelier <markand@malikania.fr> |
---|---|
date | Thu, 21 Jul 2022 21:55:02 +0200 |
parents | f98ea578b1ef |
children | 34cbbd215ef7 |
rev | line source |
---|---|
19 | 1 #include <errno.h> |
2 #include <poll.h> | |
3 #include <signal.h> | |
4 #include <string.h> | |
5 #include <time.h> | |
6 | |
7 #include <utlist.h> | |
8 | |
9 #include "apic.h" | |
10 #include "log.h" | |
11 #include "sciworkerd.h" | |
12 #include "task.h" | |
13 #include "types.h" | |
14 #include "util.h" | |
15 | |
16 #define TAG "sigworkerd: " | |
17 | |
18 struct taskentry { | |
19 struct task *task; | |
20 struct job job; | |
21 struct taskentry *next; | |
22 }; | |
23 | |
24 static struct taskentry *taskpending; | |
25 static struct taskentry *tasks; | |
26 static struct taskentry *taskfinished; | |
27 static struct worker worker; | |
28 static int run = 1; | |
29 | |
30 struct sciworkerd sciworkerd = { | |
31 .fetchinterval = 300, | |
32 .maxjobs = 4, | |
33 .timeout = 600 | |
34 }; | |
35 | |
36 static inline void | |
37 taskentry_free(struct taskentry *entry) | |
38 { | |
39 if (task_status(entry->task) == TASKSTATUS_RUNNING) { | |
40 if (task_kill(entry->task) == 0) | |
41 task_wait(entry->task); | |
42 } | |
43 | |
44 task_free(entry->task); | |
45 free(entry); | |
46 } | |
47 | |
48 static void | |
49 stop(int sign) | |
50 { | |
51 log_info(TAG "exiting on signal %d\n", sign); | |
52 run = 0; | |
53 } | |
54 | |
55 static inline int | |
56 pending(int id) | |
57 { | |
58 const struct taskentry *iter; | |
59 | |
60 LL_FOREACH(taskpending, iter) | |
61 if (iter->job.id == id) | |
62 return 1; | |
63 | |
64 return 0; | |
65 } | |
66 | |
67 static inline void | |
68 queue(const struct job *job) | |
69 { | |
70 struct taskentry *tk; | |
71 | |
72 log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag); | |
73 | |
74 tk = util_calloc(1, sizeof (*tk)); | |
75 tk->task = task_new(job->tag); | |
76 memcpy(&tk->job, job, sizeof (*job)); | |
77 LL_APPEND(taskpending, tk); | |
78 } | |
79 | |
80 static void | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
81 merge(struct job *jobs, size_t jobsz) |
19 | 82 { |
83 size_t total = 0; | |
84 | |
20 | 85 for (ssize_t i = 0; i < jobsz; ++i) { |
86 if (!pending(jobs[i].id)) { | |
87 queue(&jobs[i]); | |
88 total++; | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
89 } else |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
90 job_finish(&jobs[i]); |
20 | 91 } |
19 | 92 |
20 | 93 log_info(TAG "added %zu new pending tasks", total); |
19 | 94 } |
95 | |
96 /* | |
97 * Fetch jobs periodically, depending on the user setting. | |
98 */ | |
99 static void | |
100 fetch_jobs(void) | |
101 { | |
102 static time_t startup; | |
103 time_t now; | |
20 | 104 struct apic req; |
105 struct job todo[SCI_JOB_LIST_MAX]; | |
106 ssize_t todosz; | |
19 | 107 |
108 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { | |
109 startup = now; | |
110 | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
111 log_info(TAG "fetching jobs"); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
112 |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0) |
19 | 114 log_warn(TAG "unable to fetch jobs: %s", req.error); |
20 | 115 else |
116 merge(todo, todosz); | |
117 | |
118 apic_finish(&req); | |
19 | 119 } |
120 } | |
121 | |
122 /* | |
123 * Fetch information about myself. | |
124 */ | |
125 static void | |
126 fetch_worker(void) | |
127 { | |
20 | 128 struct apic req; |
19 | 129 |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
131 log_die(TAG "unable to fetch worker info: %s", req.error); |
20 | 132 |
19 | 133 log_info("worker name: %s", worker.name); |
134 log_info("worker description: %s", worker.desc); | |
135 | |
20 | 136 apic_finish(&req); |
19 | 137 } |
138 | |
139 static inline size_t | |
140 count(const struct taskentry *head) | |
141 { | |
142 const struct taskentry *iter; | |
143 size_t tot = 0; | |
144 | |
145 LL_FOREACH(head, iter) | |
146 tot++; | |
147 | |
148 return tot; | |
149 } | |
150 | |
151 /* | |
152 * Start a task. We fetch its script code and then create the task with that | |
153 * script. | |
154 */ | |
155 static int | |
156 start(struct taskentry *entry) | |
157 { | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
158 struct apic req; |
19 | 159 struct project project; |
160 pid_t pid; | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
161 int ret = -1; |
19 | 162 |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
163 if (apic_project_find(&req, &project, entry->job.project_name) < 0) |
19 | 164 return log_warn(TAG "unable to fetch project, dropping task"), -1; |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
165 |
19 | 166 if (task_setup(entry->task, project.script) < 0) |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
167 log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
168 else if ((pid = task_start(entry->task)) < 0) |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
169 log_warn(TAG "unable to spawn task process: %s", strerror(errno)); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
170 else { |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
171 log_info(TAG "task %lld spawned", (long long int)pid); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
172 ret = 0; |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
173 } |
19 | 174 |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
175 project_finish(&project); |
19 | 176 |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
177 return ret; |
19 | 178 } |
179 | |
180 static inline void | |
181 delete(struct taskentry *entry) | |
182 { | |
183 LL_DELETE(taskpending, entry); | |
184 task_free(entry->task); | |
185 free(entry); | |
186 } | |
187 | |
188 static void | |
189 start_all(void) | |
190 { | |
191 size_t running = count(tasks); | |
192 struct taskentry *entry; | |
193 | |
194 while (running-- > 0 && (entry = taskpending)) { | |
195 if (start(entry) < 0) | |
196 delete(entry); | |
197 else { | |
198 LL_DELETE(taskpending, entry); | |
199 LL_APPEND(tasks, entry); | |
200 } | |
201 } | |
202 } | |
203 | |
204 static void | |
205 process_all(void) | |
206 { | |
207 struct taskentry *iter, *next; | |
208 struct taskcode code; | |
209 struct pollfd *fds; | |
210 size_t fdsz, i = 0; | |
211 int ret; | |
212 | |
213 /* First, read every pipes. */ | |
214 if (!(fdsz = count(tasks))) | |
215 return; | |
216 | |
217 fds = util_calloc(fdsz, sizeof (*fds)); | |
218 | |
219 for (iter = tasks; iter; iter = iter->next) | |
220 task_prepare(iter->task, &fds[i++]); | |
221 | |
222 if (poll(fds, fdsz, 5000) < 0) | |
223 log_warn("poll: %s", strerror(errno)); | |
224 | |
225 for (iter = tasks, i = 0; i < fdsz; ++i) { | |
226 next = iter->next; | |
227 | |
228 /* | |
229 * 0: EOF [wait] | |
230 * -1: error [kill + wait] | |
231 * >0: keep going [nothing] | |
232 */ | |
233 if ((ret = task_sync(iter->task, &fds[i])) < 0) { | |
234 log_warn(TAG "pipe error: %s, killing task", strerror(errno)); | |
235 | |
236 if (task_kill(iter->task) < 0) | |
237 log_warn(TAG "task kill error: %s", strerror(errno)); | |
238 } | |
239 | |
240 /* Now wait for the task to complete. */ | |
241 if (ret <= 0) { | |
242 if (task_wait(iter->task) < 0) | |
243 log_warn(TAG "task wait error: %s", strerror(errno)); | |
244 else { | |
245 code = task_code(iter->task); | |
246 | |
247 switch (task_status(iter->task)) { | |
248 case TASKSTATUS_EXITED: | |
249 log_info(TAG "task %lld exited with code %d", | |
250 (long long int)task_pid(iter->task), code.exitcode); | |
251 break; | |
252 case TASKSTATUS_KILLED: | |
253 log_info(TAG "task %lld killed with signal %d", | |
254 (long long int)task_pid(iter->task), code.sigcode); | |
255 break; | |
256 default: | |
257 break; | |
258 } | |
259 } | |
260 | |
261 /* Remove that task and push to the outgoing queue. */ | |
262 next = iter->next; | |
263 LL_DELETE(tasks, iter); | |
264 LL_APPEND(taskfinished, iter); | |
265 } | |
266 } | |
267 | |
268 free(fds); | |
269 } | |
270 | |
271 /* | |
272 * Kill all tasks that have been running for too long. | |
273 */ | |
274 static void | |
275 ghost_all(void) | |
276 { | |
277 struct taskentry *iter, *tmp; | |
278 time_t now; | |
279 | |
280 LL_FOREACH_SAFE(tasks, iter, tmp) { | |
281 if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) | |
282 continue; | |
283 | |
284 /* Do not attempt to wait if kill failed to avoid lock. */ | |
285 log_info(TAG "task timeout, killing"); | |
286 | |
287 if (task_kill(iter->task) == 0) | |
288 task_wait(iter->task); | |
289 | |
290 LL_DELETE(tasks, iter); | |
291 LL_APPEND(taskfinished, iter); | |
292 } | |
293 } | |
294 | |
295 static int | |
296 publish(struct taskentry *iter) | |
297 { | |
298 // TODO: add sigcode. | |
299 struct taskcode code = task_code(iter->task); | |
300 struct jobresult res = { | |
301 .job_id = iter->job.id, | |
302 .exitcode = code.exitcode, | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
303 .log = util_strdup(task_console(iter->task)), |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
304 .worker_name = worker.name |
19 | 305 }; |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
306 struct apic req; |
19 | 307 json_t *doc; |
308 int ret; | |
309 | |
310 doc = jobresult_to(&res, 1); | |
311 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); | |
312 json_decref(doc); | |
313 | |
314 if (ret) | |
315 log_warn(TAG "unable to publish task: %s", req.error); | |
316 else | |
317 log_info(TAG "task successfully published"); | |
318 | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
319 jobresult_finish(&res); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
320 apic_finish(&req); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
321 |
19 | 322 return ret; |
323 } | |
324 | |
325 static void | |
326 publish_all(void) | |
327 { | |
328 struct taskentry *iter, *tmp; | |
329 | |
330 LL_FOREACH_SAFE(taskfinished, iter, tmp) { | |
331 if (publish(iter) == 0) { | |
332 LL_DELETE(taskfinished, iter); | |
333 taskentry_free(iter); | |
334 } | |
335 } | |
336 } | |
337 | |
338 void | |
339 sciworkerd_init(void) | |
340 { | |
341 struct sigaction sa = {0}; | |
342 | |
343 log_open("sigworkerd"); | |
344 | |
345 sigemptyset(&sa.sa_mask); | |
346 sa.sa_handler = stop; | |
347 | |
23
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
348 util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); |
2cb228f23f53
misc: rework todo/jobs HTTP requests
David Demelier <markand@malikania.fr>
parents:
20
diff
changeset
|
349 |
19 | 350 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) |
351 log_die(TAG "sigaction: %s", strerror(errno)); | |
352 | |
353 fetch_worker(); | |
354 } | |
355 | |
356 void | |
357 sciworkerd_run(void) | |
358 { | |
359 while (run) { | |
360 fetch_jobs(); | |
361 process_all(); | |
362 ghost_all(); | |
363 publish_all(); | |
364 } | |
365 } | |
366 | |
367 void | |
368 sciworkerd_finish(void) | |
369 { | |
370 struct taskentry *iter, *tmp; | |
371 | |
372 LL_FOREACH_SAFE(taskpending, iter, tmp) | |
373 taskentry_free(iter); | |
374 LL_FOREACH_SAFE(tasks, iter, tmp) | |
375 taskentry_free(iter); | |
376 LL_FOREACH_SAFE(taskfinished, iter, tmp) | |
377 taskentry_free(iter); | |
378 } |