comparison sciworkerd/sciworkerd.c @ 19:de4bf839b565

misc: revamp SQL
author David Demelier <markand@malikania.fr>
date Fri, 15 Jul 2022 11:11:48 +0200
parents
children f98ea578b1ef
comparison
equal deleted inserted replaced
18:600204c31bf0 19:de4bf839b565
1 #include <errno.h>
2 #include <poll.h>
3 #include <signal.h>
4 #include <string.h>
5 #include <time.h>
6
7 #include <utlist.h>
8
9 #include "apic.h"
10 #include "log.h"
11 #include "sciworkerd.h"
12 #include "task.h"
13 #include "types.h"
14 #include "util.h"
15
16 #define TAG "sigworkerd: "
17
18 struct taskentry {
19 struct task *task;
20 struct job job;
21 struct taskentry *next;
22 };
23
24 static struct taskentry *taskpending;
25 static struct taskentry *tasks;
26 static struct taskentry *taskfinished;
27 static struct worker worker;
28 static int run = 1;
29
30 struct sciworkerd sciworkerd = {
31 .fetchinterval = 300,
32 .maxjobs = 4,
33 .timeout = 600
34 };
35
36 static inline void
37 taskentry_free(struct taskentry *entry)
38 {
39 if (task_status(entry->task) == TASKSTATUS_RUNNING) {
40 if (task_kill(entry->task) == 0)
41 task_wait(entry->task);
42 }
43
44 task_free(entry->task);
45 free(entry);
46 }
47
48 static void
49 stop(int sign)
50 {
51 log_info(TAG "exiting on signal %d\n", sign);
52 run = 0;
53 }
54
55 static inline int
56 pending(int id)
57 {
58 const struct taskentry *iter;
59
60 LL_FOREACH(taskpending, iter)
61 if (iter->job.id == id)
62 return 1;
63
64 return 0;
65 }
66
67 static inline void
68 queue(const struct job *job)
69 {
70 struct taskentry *tk;
71
72 log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag);
73
74 tk = util_calloc(1, sizeof (*tk));
75 tk->task = task_new(job->tag);
76 memcpy(&tk->job, job, sizeof (*job));
77 LL_APPEND(taskpending, tk);
78 }
79
80 static void
81 merge(json_t *doc)
82 {
83 struct job jobs[SCI_JOB_LIST_MAX];
84 ssize_t jobsz;
85 size_t total = 0;
86
87 if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
88 log_warn(TAG "error while parsing jobs: %s", strerror(errno));
89 else {
90 for (ssize_t i = 0; i < jobsz; ++i) {
91 if (!pending(jobs[i].id)) {
92 queue(&jobs[i]);
93 total++;
94 }
95 }
96
97 log_info(TAG "added %zu new pending tasks", total);
98 }
99 }
100
101 /*
102 * Fetch jobs periodically, depending on the user setting.
103 */
104 static void
105 fetch_jobs(void)
106 {
107 static time_t startup;
108 time_t now;
109 struct apicreq req;
110
111 if (!startup)
112 startup = time(NULL);
113
114 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
115 startup = now;
116
117 if (apic_get(&req, "%s/api/v1/%s", sciworkerd.url, sciworkerd.name) < 0)
118 log_warn(TAG "unable to fetch jobs: %s", req.error);
119 if (req.doc) {
120 merge(req.doc);
121 json_decref(req.doc);
122 }
123 }
124 }
125
126 /*
127 * Fetch information about myself.
128 */
129 static void
130 fetch_worker(void)
131 {
132 struct apicreq req;
133
134 if (apic_get(&req, "%s/api/v1/workers/%s", sciworkerd.url, sciworkerd.name) < 0)
135 log_warn(TAG "unable to fetch worker info: %s", req.error);
136 if (!req.doc)
137 log_die(TAG "empty worker response");
138 if (worker_from(&worker, 1, req.doc) < 0)
139 log_die(TAG "unable to parse worker", strerror(errno));
140
141 log_info("worker id: %d", worker.id);
142 log_info("worker name: %s", worker.name);
143 log_info("worker description: %s", worker.desc);
144
145 json_decref(req.doc);
146 }
147
148 /*
149 * Fetch information about a project.
150 */
151 static int
152 fetch_project(struct project *project, int id)
153 {
154 struct apicreq req;
155
156 if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0)
157 return log_warn(TAG "unable to fetch project info: %s", req.error), -1;
158 if (!req.doc)
159 return log_warn(TAG "empty project response"), -1;
160 if (project_from(project, 1, req.doc) < 0)
161 return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1;
162
163 return 0;
164 }
165
166 static inline size_t
167 count(const struct taskentry *head)
168 {
169 const struct taskentry *iter;
170 size_t tot = 0;
171
172 LL_FOREACH(head, iter)
173 tot++;
174
175 return tot;
176 }
177
178 /*
179 * Start a task. We fetch its script code and then create the task with that
180 * script.
181 */
182 static int
183 start(struct taskentry *entry)
184 {
185 struct project project;
186 pid_t pid;
187
188 if (fetch_project(&project, entry->job.project_id) < 0)
189 return log_warn(TAG "unable to fetch project, dropping task"), -1;
190 if (task_setup(entry->task, project.script) < 0)
191 return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1;
192 if ((pid = task_start(entry->task)) < 0)
193 return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1;
194
195 log_info(TAG "task %lld spawned", (long long int)pid);
196
197 return 0;
198 }
199
200 static inline void
201 delete(struct taskentry *entry)
202 {
203 LL_DELETE(taskpending, entry);
204 task_free(entry->task);
205 free(entry);
206 }
207
208 static void
209 start_all(void)
210 {
211 size_t running = count(tasks);
212 struct taskentry *entry;
213
214 while (running-- > 0 && (entry = taskpending)) {
215 if (start(entry) < 0)
216 delete(entry);
217 else {
218 LL_DELETE(taskpending, entry);
219 LL_APPEND(tasks, entry);
220 }
221 }
222 }
223
224 static void
225 process_all(void)
226 {
227 struct taskentry *iter, *next;
228 struct taskcode code;
229 struct pollfd *fds;
230 size_t fdsz, i = 0;
231 int ret;
232
233 /* First, read every pipes. */
234 if (!(fdsz = count(tasks)))
235 return;
236
237 fds = util_calloc(fdsz, sizeof (*fds));
238
239 for (iter = tasks; iter; iter = iter->next)
240 task_prepare(iter->task, &fds[i++]);
241
242 if (poll(fds, fdsz, 5000) < 0)
243 log_warn("poll: %s", strerror(errno));
244
245 for (iter = tasks, i = 0; i < fdsz; ++i) {
246 next = iter->next;
247
248 /*
249 * 0: EOF [wait]
250 * -1: error [kill + wait]
251 * >0: keep going [nothing]
252 */
253 if ((ret = task_sync(iter->task, &fds[i])) < 0) {
254 log_warn(TAG "pipe error: %s, killing task", strerror(errno));
255
256 if (task_kill(iter->task) < 0)
257 log_warn(TAG "task kill error: %s", strerror(errno));
258 }
259
260 /* Now wait for the task to complete. */
261 if (ret <= 0) {
262 if (task_wait(iter->task) < 0)
263 log_warn(TAG "task wait error: %s", strerror(errno));
264 else {
265 code = task_code(iter->task);
266
267 switch (task_status(iter->task)) {
268 case TASKSTATUS_EXITED:
269 log_info(TAG "task %lld exited with code %d",
270 (long long int)task_pid(iter->task), code.exitcode);
271 break;
272 case TASKSTATUS_KILLED:
273 log_info(TAG "task %lld killed with signal %d",
274 (long long int)task_pid(iter->task), code.sigcode);
275 break;
276 default:
277 break;
278 }
279 }
280
281 /* Remove that task and push to the outgoing queue. */
282 next = iter->next;
283 LL_DELETE(tasks, iter);
284 LL_APPEND(taskfinished, iter);
285 }
286 }
287
288 free(fds);
289 }
290
291 /*
292 * Kill all tasks that have been running for too long.
293 */
294 static void
295 ghost_all(void)
296 {
297 struct taskentry *iter, *tmp;
298 time_t now;
299
300 LL_FOREACH_SAFE(tasks, iter, tmp) {
301 if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout)
302 continue;
303
304 /* Do not attempt to wait if kill failed to avoid lock. */
305 log_info(TAG "task timeout, killing");
306
307 if (task_kill(iter->task) == 0)
308 task_wait(iter->task);
309
310 LL_DELETE(tasks, iter);
311 LL_APPEND(taskfinished, iter);
312 }
313 }
314
315 static int
316 publish(struct taskentry *iter)
317 {
318 // TODO: add sigcode.
319 struct taskcode code = task_code(iter->task);
320 struct jobresult res = {
321 .job_id = iter->job.id,
322 .exitcode = code.exitcode,
323 .log = task_console(iter->task),
324 .worker_id = worker.id
325 };
326 struct apicreq req;
327 json_t *doc;
328 int ret;
329
330 doc = jobresult_to(&res, 1);
331 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url);
332 json_decref(doc);
333
334 if (ret)
335 log_warn(TAG "unable to publish task: %s", req.error);
336 else
337 log_info(TAG "task successfully published");
338
339 return ret;
340 }
341
342 static void
343 publish_all(void)
344 {
345 struct taskentry *iter, *tmp;
346
347 LL_FOREACH_SAFE(taskfinished, iter, tmp) {
348 if (publish(iter) == 0) {
349 LL_DELETE(taskfinished, iter);
350 taskentry_free(iter);
351 }
352 }
353 }
354
355 void
356 sciworkerd_init(void)
357 {
358 struct sigaction sa = {0};
359
360 log_open("sigworkerd");
361
362 sigemptyset(&sa.sa_mask);
363 sa.sa_handler = stop;
364
365 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
366 log_die(TAG "sigaction: %s", strerror(errno));
367
368 fetch_worker();
369 }
370
371 void
372 sciworkerd_run(void)
373 {
374 while (run) {
375 fetch_jobs();
376 process_all();
377 ghost_all();
378 publish_all();
379 }
380 }
381
382 void
383 sciworkerd_finish(void)
384 {
385 struct taskentry *iter, *tmp;
386
387 LL_FOREACH_SAFE(taskpending, iter, tmp)
388 taskentry_free(iter);
389 LL_FOREACH_SAFE(tasks, iter, tmp)
390 taskentry_free(iter);
391 LL_FOREACH_SAFE(taskfinished, iter, tmp)
392 taskentry_free(iter);
393 }