Mercurial > sci
comparison sciworkerd/sciworkerd.c @ 19:de4bf839b565
misc: revamp SQL
author | David Demelier <markand@malikania.fr> |
---|---|
date | Fri, 15 Jul 2022 11:11:48 +0200 |
parents | |
children | f98ea578b1ef |
comparison
equal
deleted
inserted
replaced
18:600204c31bf0 | 19:de4bf839b565 |
---|---|
1 #include <errno.h> | |
2 #include <poll.h> | |
3 #include <signal.h> | |
4 #include <string.h> | |
5 #include <time.h> | |
6 | |
7 #include <utlist.h> | |
8 | |
9 #include "apic.h" | |
10 #include "log.h" | |
11 #include "sciworkerd.h" | |
12 #include "task.h" | |
13 #include "types.h" | |
14 #include "util.h" | |
15 | |
16 #define TAG "sigworkerd: " | |
17 | |
18 struct taskentry { | |
19 struct task *task; | |
20 struct job job; | |
21 struct taskentry *next; | |
22 }; | |
23 | |
24 static struct taskentry *taskpending; | |
25 static struct taskentry *tasks; | |
26 static struct taskentry *taskfinished; | |
27 static struct worker worker; | |
28 static int run = 1; | |
29 | |
30 struct sciworkerd sciworkerd = { | |
31 .fetchinterval = 300, | |
32 .maxjobs = 4, | |
33 .timeout = 600 | |
34 }; | |
35 | |
36 static inline void | |
37 taskentry_free(struct taskentry *entry) | |
38 { | |
39 if (task_status(entry->task) == TASKSTATUS_RUNNING) { | |
40 if (task_kill(entry->task) == 0) | |
41 task_wait(entry->task); | |
42 } | |
43 | |
44 task_free(entry->task); | |
45 free(entry); | |
46 } | |
47 | |
48 static void | |
49 stop(int sign) | |
50 { | |
51 log_info(TAG "exiting on signal %d\n", sign); | |
52 run = 0; | |
53 } | |
54 | |
55 static inline int | |
56 pending(int id) | |
57 { | |
58 const struct taskentry *iter; | |
59 | |
60 LL_FOREACH(taskpending, iter) | |
61 if (iter->job.id == id) | |
62 return 1; | |
63 | |
64 return 0; | |
65 } | |
66 | |
67 static inline void | |
68 queue(const struct job *job) | |
69 { | |
70 struct taskentry *tk; | |
71 | |
72 log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag); | |
73 | |
74 tk = util_calloc(1, sizeof (*tk)); | |
75 tk->task = task_new(job->tag); | |
76 memcpy(&tk->job, job, sizeof (*job)); | |
77 LL_APPEND(taskpending, tk); | |
78 } | |
79 | |
80 static void | |
81 merge(json_t *doc) | |
82 { | |
83 struct job jobs[SCI_JOB_LIST_MAX]; | |
84 ssize_t jobsz; | |
85 size_t total = 0; | |
86 | |
87 if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) | |
88 log_warn(TAG "error while parsing jobs: %s", strerror(errno)); | |
89 else { | |
90 for (ssize_t i = 0; i < jobsz; ++i) { | |
91 if (!pending(jobs[i].id)) { | |
92 queue(&jobs[i]); | |
93 total++; | |
94 } | |
95 } | |
96 | |
97 log_info(TAG "added %zu new pending tasks", total); | |
98 } | |
99 } | |
100 | |
101 /* | |
102 * Fetch jobs periodically, depending on the user setting. | |
103 */ | |
104 static void | |
105 fetch_jobs(void) | |
106 { | |
107 static time_t startup; | |
108 time_t now; | |
109 struct apicreq req; | |
110 | |
111 if (!startup) | |
112 startup = time(NULL); | |
113 | |
114 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { | |
115 startup = now; | |
116 | |
117 if (apic_get(&req, "%s/api/v1/%s", sciworkerd.url, sciworkerd.name) < 0) | |
118 log_warn(TAG "unable to fetch jobs: %s", req.error); | |
119 if (req.doc) { | |
120 merge(req.doc); | |
121 json_decref(req.doc); | |
122 } | |
123 } | |
124 } | |
125 | |
126 /* | |
127 * Fetch information about myself. | |
128 */ | |
129 static void | |
130 fetch_worker(void) | |
131 { | |
132 struct apicreq req; | |
133 | |
134 if (apic_get(&req, "%s/api/v1/workers/%s", sciworkerd.url, sciworkerd.name) < 0) | |
135 log_warn(TAG "unable to fetch worker info: %s", req.error); | |
136 if (!req.doc) | |
137 log_die(TAG "empty worker response"); | |
138 if (worker_from(&worker, 1, req.doc) < 0) | |
139 log_die(TAG "unable to parse worker", strerror(errno)); | |
140 | |
141 log_info("worker id: %d", worker.id); | |
142 log_info("worker name: %s", worker.name); | |
143 log_info("worker description: %s", worker.desc); | |
144 | |
145 json_decref(req.doc); | |
146 } | |
147 | |
148 /* | |
149 * Fetch information about a project. | |
150 */ | |
151 static int | |
152 fetch_project(struct project *project, int id) | |
153 { | |
154 struct apicreq req; | |
155 | |
156 if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0) | |
157 return log_warn(TAG "unable to fetch project info: %s", req.error), -1; | |
158 if (!req.doc) | |
159 return log_warn(TAG "empty project response"), -1; | |
160 if (project_from(project, 1, req.doc) < 0) | |
161 return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1; | |
162 | |
163 return 0; | |
164 } | |
165 | |
166 static inline size_t | |
167 count(const struct taskentry *head) | |
168 { | |
169 const struct taskentry *iter; | |
170 size_t tot = 0; | |
171 | |
172 LL_FOREACH(head, iter) | |
173 tot++; | |
174 | |
175 return tot; | |
176 } | |
177 | |
178 /* | |
179 * Start a task. We fetch its script code and then create the task with that | |
180 * script. | |
181 */ | |
182 static int | |
183 start(struct taskentry *entry) | |
184 { | |
185 struct project project; | |
186 pid_t pid; | |
187 | |
188 if (fetch_project(&project, entry->job.project_id) < 0) | |
189 return log_warn(TAG "unable to fetch project, dropping task"), -1; | |
190 if (task_setup(entry->task, project.script) < 0) | |
191 return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1; | |
192 if ((pid = task_start(entry->task)) < 0) | |
193 return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1; | |
194 | |
195 log_info(TAG "task %lld spawned", (long long int)pid); | |
196 | |
197 return 0; | |
198 } | |
199 | |
200 static inline void | |
201 delete(struct taskentry *entry) | |
202 { | |
203 LL_DELETE(taskpending, entry); | |
204 task_free(entry->task); | |
205 free(entry); | |
206 } | |
207 | |
208 static void | |
209 start_all(void) | |
210 { | |
211 size_t running = count(tasks); | |
212 struct taskentry *entry; | |
213 | |
214 while (running-- > 0 && (entry = taskpending)) { | |
215 if (start(entry) < 0) | |
216 delete(entry); | |
217 else { | |
218 LL_DELETE(taskpending, entry); | |
219 LL_APPEND(tasks, entry); | |
220 } | |
221 } | |
222 } | |
223 | |
224 static void | |
225 process_all(void) | |
226 { | |
227 struct taskentry *iter, *next; | |
228 struct taskcode code; | |
229 struct pollfd *fds; | |
230 size_t fdsz, i = 0; | |
231 int ret; | |
232 | |
233 /* First, read every pipes. */ | |
234 if (!(fdsz = count(tasks))) | |
235 return; | |
236 | |
237 fds = util_calloc(fdsz, sizeof (*fds)); | |
238 | |
239 for (iter = tasks; iter; iter = iter->next) | |
240 task_prepare(iter->task, &fds[i++]); | |
241 | |
242 if (poll(fds, fdsz, 5000) < 0) | |
243 log_warn("poll: %s", strerror(errno)); | |
244 | |
245 for (iter = tasks, i = 0; i < fdsz; ++i) { | |
246 next = iter->next; | |
247 | |
248 /* | |
249 * 0: EOF [wait] | |
250 * -1: error [kill + wait] | |
251 * >0: keep going [nothing] | |
252 */ | |
253 if ((ret = task_sync(iter->task, &fds[i])) < 0) { | |
254 log_warn(TAG "pipe error: %s, killing task", strerror(errno)); | |
255 | |
256 if (task_kill(iter->task) < 0) | |
257 log_warn(TAG "task kill error: %s", strerror(errno)); | |
258 } | |
259 | |
260 /* Now wait for the task to complete. */ | |
261 if (ret <= 0) { | |
262 if (task_wait(iter->task) < 0) | |
263 log_warn(TAG "task wait error: %s", strerror(errno)); | |
264 else { | |
265 code = task_code(iter->task); | |
266 | |
267 switch (task_status(iter->task)) { | |
268 case TASKSTATUS_EXITED: | |
269 log_info(TAG "task %lld exited with code %d", | |
270 (long long int)task_pid(iter->task), code.exitcode); | |
271 break; | |
272 case TASKSTATUS_KILLED: | |
273 log_info(TAG "task %lld killed with signal %d", | |
274 (long long int)task_pid(iter->task), code.sigcode); | |
275 break; | |
276 default: | |
277 break; | |
278 } | |
279 } | |
280 | |
281 /* Remove that task and push to the outgoing queue. */ | |
282 next = iter->next; | |
283 LL_DELETE(tasks, iter); | |
284 LL_APPEND(taskfinished, iter); | |
285 } | |
286 } | |
287 | |
288 free(fds); | |
289 } | |
290 | |
291 /* | |
292 * Kill all tasks that have been running for too long. | |
293 */ | |
294 static void | |
295 ghost_all(void) | |
296 { | |
297 struct taskentry *iter, *tmp; | |
298 time_t now; | |
299 | |
300 LL_FOREACH_SAFE(tasks, iter, tmp) { | |
301 if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) | |
302 continue; | |
303 | |
304 /* Do not attempt to wait if kill failed to avoid lock. */ | |
305 log_info(TAG "task timeout, killing"); | |
306 | |
307 if (task_kill(iter->task) == 0) | |
308 task_wait(iter->task); | |
309 | |
310 LL_DELETE(tasks, iter); | |
311 LL_APPEND(taskfinished, iter); | |
312 } | |
313 } | |
314 | |
315 static int | |
316 publish(struct taskentry *iter) | |
317 { | |
318 // TODO: add sigcode. | |
319 struct taskcode code = task_code(iter->task); | |
320 struct jobresult res = { | |
321 .job_id = iter->job.id, | |
322 .exitcode = code.exitcode, | |
323 .log = task_console(iter->task), | |
324 .worker_id = worker.id | |
325 }; | |
326 struct apicreq req; | |
327 json_t *doc; | |
328 int ret; | |
329 | |
330 doc = jobresult_to(&res, 1); | |
331 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); | |
332 json_decref(doc); | |
333 | |
334 if (ret) | |
335 log_warn(TAG "unable to publish task: %s", req.error); | |
336 else | |
337 log_info(TAG "task successfully published"); | |
338 | |
339 return ret; | |
340 } | |
341 | |
342 static void | |
343 publish_all(void) | |
344 { | |
345 struct taskentry *iter, *tmp; | |
346 | |
347 LL_FOREACH_SAFE(taskfinished, iter, tmp) { | |
348 if (publish(iter) == 0) { | |
349 LL_DELETE(taskfinished, iter); | |
350 taskentry_free(iter); | |
351 } | |
352 } | |
353 } | |
354 | |
355 void | |
356 sciworkerd_init(void) | |
357 { | |
358 struct sigaction sa = {0}; | |
359 | |
360 log_open("sigworkerd"); | |
361 | |
362 sigemptyset(&sa.sa_mask); | |
363 sa.sa_handler = stop; | |
364 | |
365 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) | |
366 log_die(TAG "sigaction: %s", strerror(errno)); | |
367 | |
368 fetch_worker(); | |
369 } | |
370 | |
371 void | |
372 sciworkerd_run(void) | |
373 { | |
374 while (run) { | |
375 fetch_jobs(); | |
376 process_all(); | |
377 ghost_all(); | |
378 publish_all(); | |
379 } | |
380 } | |
381 | |
382 void | |
383 sciworkerd_finish(void) | |
384 { | |
385 struct taskentry *iter, *tmp; | |
386 | |
387 LL_FOREACH_SAFE(taskpending, iter, tmp) | |
388 taskentry_free(iter); | |
389 LL_FOREACH_SAFE(tasks, iter, tmp) | |
390 taskentry_free(iter); | |
391 LL_FOREACH_SAFE(taskfinished, iter, tmp) | |
392 taskentry_free(iter); | |
393 } |