Mercurial > sci
comparison sciworkerd.c @ 3:215c0c3b3609
misc: use JSON everywhere (scictl/sciwebd)
author | David Demelier <markand@malikania.fr> |
---|---|
date | Mon, 14 Jun 2021 22:08:24 +0200 |
parents | 5fa3d2f479b2 |
children | eb76429ce112 |
comparison
equal
deleted
inserted
replaced
2:5fa3d2f479b2 | 3:215c0c3b3609 |
---|---|
16 | 16 |
17 #include <curl/curl.h> | 17 #include <curl/curl.h> |
18 #include <jansson.h> | 18 #include <jansson.h> |
19 | 19 |
20 #include "config.h" | 20 #include "config.h" |
21 #include "job.h" | |
22 #include "log.h" | 21 #include "log.h" |
23 #include "project.h" | 22 #include "types.h" |
24 #include "util.h" | 23 #include "util.h" |
24 | |
25 #define TAG_MAX 256 | |
25 | 26 |
26 enum taskst { | 27 enum taskst { |
27 TASKST_PENDING, /* not started yet. */ | 28 TASKST_PENDING, /* not started yet. */ |
28 TASKST_RUNNING, /* currently running. */ | 29 TASKST_RUNNING, /* currently running. */ |
29 TASKST_COMPLETED, /* completed but not synced yet. */ | 30 TASKST_COMPLETED, /* completed but not synced yet. */ |
32 | 33 |
33 struct task { | 34 struct task { |
34 enum taskst status; | 35 enum taskst status; |
35 pid_t child; | 36 pid_t child; |
36 int pipe[2]; | 37 int pipe[2]; |
37 int retcode; | 38 int exitcode; |
38 struct job job; | 39 int job_id; |
40 int project_id; | |
41 char job_tag[TAG_MAX]; | |
39 char out[SCI_CONSOLE_MAX]; | 42 char out[SCI_CONSOLE_MAX]; |
40 char script[PATH_MAX]; | 43 char script[PATH_MAX]; |
41 int scriptfd; | 44 int scriptfd; |
42 TAILQ_ENTRY(task) link; | 45 TAILQ_ENTRY(task) link; |
43 }; | 46 }; |
68 .worker = "default", | 71 .worker = "default", |
69 .maxbuilds = 4 | 72 .maxbuilds = 4 |
70 }; | 73 }; |
71 | 74 |
72 static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); | 75 static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); |
76 static struct worker worker; | |
73 | 77 |
74 #if 0 | 78 #if 0 |
75 static int sigpipe[2]; | 79 static int sigpipe[2]; |
76 #endif | 80 #endif |
77 | 81 |
107 } | 111 } |
108 | 112 |
109 static void | 113 static void |
110 destroy(struct task *tk) | 114 destroy(struct task *tk) |
111 { | 115 { |
112 log_debug("destroying task %lld", tk->job.id); | 116 log_debug("destroying task %d", tk->job_id); |
113 | 117 |
114 if (tk->pipe[0]) | 118 if (tk->pipe[0]) |
115 close(tk->pipe[0]); | 119 close(tk->pipe[0]); |
116 if (tk->pipe[1]) | 120 if (tk->pipe[1]) |
117 close(tk->pipe[1]); | 121 close(tk->pipe[1]); |
139 /* Child. */ | 143 /* Child. */ |
140 dup2(tk->pipe[1], STDOUT_FILENO); | 144 dup2(tk->pipe[1], STDOUT_FILENO); |
141 dup2(tk->pipe[1], STDERR_FILENO); | 145 dup2(tk->pipe[1], STDERR_FILENO); |
142 close(tk->pipe[0]); | 146 close(tk->pipe[0]); |
143 close(tk->pipe[1]); | 147 close(tk->pipe[1]); |
144 log_debug("spawn: running process (%lld) %s", tk->child, tk->script); | 148 log_debug("spawn: running process (%lld) %s", |
149 (long long int)tk->child, tk->script); | |
145 | 150 |
146 tk->status = TASKST_RUNNING; | 151 tk->status = TASKST_RUNNING; |
147 | 152 |
148 if (execl(tk->script, tk->script, tk->job.tag, NULL) < 0) { | 153 if (execl(tk->script, tk->script, tk->job_tag, NULL) < 0) { |
149 tk->status = TASKST_PENDING; | 154 tk->status = TASKST_PENDING; |
150 log_warn("exec %s: %s", tk->script, strerror(errno)); | 155 log_warn("exec %s: %s", tk->script, strerror(errno)); |
151 exit(0); | 156 exit(0); |
152 } | 157 } |
153 break; | 158 break; |
211 | 216 |
212 if ((tk = find_by_pid(sinfo->si_pid))) { | 217 if ((tk = find_by_pid(sinfo->si_pid))) { |
213 log_debug("process %lld completed", (long long int)sinfo->si_pid); | 218 log_debug("process %lld completed", (long long int)sinfo->si_pid); |
214 close(tk->pipe[1]); | 219 close(tk->pipe[1]); |
215 tk->status = TASKST_COMPLETED; | 220 tk->status = TASKST_COMPLETED; |
216 tk->retcode = status; | 221 tk->exitcode = status; |
217 tk->pipe[1] = 0; | 222 tk->pipe[1] = 0; |
218 } | 223 } |
219 | 224 |
220 #if 0 | 225 #if 0 |
221 /* | 226 /* |
226 if (write(sigpipe[1], &r, sizeof (r)) < 0) | 231 if (write(sigpipe[1], &r, sizeof (r)) < 0) |
227 err(1, "write"); | 232 err(1, "write"); |
228 #endif | 233 #endif |
229 } | 234 } |
230 | 235 |
236 static const char * | |
237 uploadenc(const struct task *tk) | |
238 { | |
239 json_t *doc; | |
240 | |
241 struct jobresult res = {0}; | |
242 char *dump; | |
243 | |
244 res.job_id = tk->job_id; | |
245 res.exitcode = tk->exitcode; | |
246 res.log = tk->out; | |
247 res.worker_id = worker.id; | |
248 | |
249 doc = jobresult_to(&res, 1); | |
250 dump = json_dumps(doc, JSON_COMPACT); | |
251 | |
252 json_decref(doc); | |
253 | |
254 return dump; | |
255 } | |
256 | |
257 static size_t | |
258 getcb(char *in, size_t n, size_t w, FILE *fp) | |
259 { | |
260 if (fwrite(in, n, w, fp) != w) | |
261 return log_warn("get: %s", strerror(errno)), 0; | |
262 | |
263 return w; | |
264 } | |
265 | |
266 static json_t * | |
267 get(const char *topic, const char *url) | |
268 { | |
269 CURL *curl; | |
270 CURLcode code; | |
271 | |
272 json_t *doc; | |
273 json_error_t error; | |
274 | |
275 char buf[SCI_MSG_MAX]; | |
276 long status; | |
277 FILE *fp; | |
278 | |
279 curl = curl_easy_init(); | |
280 | |
281 if (!(fp = fmemopen(buf, sizeof (buf), "w"))) | |
282 err(1, "fmemopen"); | |
283 | |
284 curl_easy_setopt(curl, CURLOPT_URL, url); | |
285 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); | |
286 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); | |
287 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb); | |
288 curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp); | |
289 | |
290 if ((code = curl_easy_perform(curl)) != CURLE_OK) | |
291 log_warn("%s: %s", topic, curl_easy_strerror(code)); | |
292 | |
293 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); | |
294 curl_easy_cleanup(curl); | |
295 | |
296 fclose(fp); | |
297 | |
298 if (code != CURLE_OK) | |
299 return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL; | |
300 if (status != 200) | |
301 return log_warn("%s: unexpected status code %ld", topic, status), NULL; | |
302 if (!(doc = json_loads(buf, 0, &error))) | |
303 return log_warn("%s: %s", topic, error.text), NULL; | |
304 | |
305 return doc; | |
306 } | |
307 | |
308 static size_t | |
309 silent(char *in, size_t n, size_t w, void *data) | |
310 { | |
311 (void)in; | |
312 (void)n; | |
313 (void)data; | |
314 | |
315 return w; | |
316 } | |
317 | |
318 static void | |
319 upload(struct task *tk) | |
320 { | |
321 CURL *curl; | |
322 CURLcode code; | |
323 long status; | |
324 | |
325 curl = curl_easy_init(); | |
326 curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs")); | |
327 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); | |
328 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); | |
329 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent); | |
330 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, uploadenc(tk)); | |
331 code = curl_easy_perform(curl); | |
332 | |
333 /* | |
334 * If we fail to upload data, we put the result into syncing mode so | |
335 * that we retry later without redoing the job over and over | |
336 */ | |
337 tk->status = TASKST_SYNCING; | |
338 | |
339 if (code != CURLE_OK) | |
340 log_warn("upload: %s", curl_easy_strerror(code)); | |
341 else { | |
342 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); | |
343 | |
344 if (status != 200) | |
345 log_warn("upload: unexpected return code: %ld", status); | |
346 else | |
347 destroy(tk); | |
348 } | |
349 | |
350 curl_easy_cleanup(curl); | |
351 } | |
352 | |
353 static inline void | |
354 finished(struct task *tk) | |
355 { | |
356 log_info("task %d: completed with exit code %d", tk->child, tk->exitcode); | |
357 printf("== OUTPUT ==\n"); | |
358 puts(tk->out); | |
359 upload(tk); | |
360 } | |
361 | |
362 static inline int | |
363 pending(int id) | |
364 { | |
365 struct task *t; | |
366 | |
367 TAILQ_FOREACH(t, &tasks, link) | |
368 if (t->job_id == id) | |
369 return 1; | |
370 | |
371 return 0; | |
372 } | |
373 | |
374 static void | |
375 queue(int id, int project_id, const char *tag) | |
376 { | |
377 struct task *tk; | |
378 | |
379 log_info("queued job build (%d) for tag %s\n", id, tag); | |
380 | |
381 tk = util_calloc(1, sizeof (*tk)); | |
382 tk->job_id = id; | |
383 tk->project_id = project_id; | |
384 strlcpy(tk->job_tag, tag, sizeof (tk->job_tag)); | |
385 | |
386 TAILQ_INSERT_TAIL(&tasks, tk, link); | |
387 } | |
388 | |
389 static void | |
390 merge(json_t *doc) | |
391 { | |
392 struct job jobs[SCI_JOB_LIST_MAX]; | |
393 ssize_t jobsz; | |
394 | |
395 if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) | |
396 log_warn("fetchjobs: %s", strerror(errno)); | |
397 else { | |
398 for (ssize_t i = 0; i < jobsz; ++i) { | |
399 if (!pending(jobs[i].id)) | |
400 queue(jobs[i].id, jobs[i].project_id, jobs[i].tag); | |
401 } | |
402 } | |
403 | |
404 json_decref(doc); | |
405 } | |
406 | |
407 static void | |
408 fetchjobs(void) | |
409 { | |
410 json_t *doc; | |
411 | |
412 if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker)))) | |
413 log_warn("unable to retrieve jobs"); | |
414 else | |
415 merge(doc); | |
416 } | |
417 | |
418 /* | |
419 * This function reads stdout/stderr pipe from child and optionally remove them | |
420 * if they have completed. | |
421 */ | |
422 static void | |
423 readall(struct fds *fds) | |
424 { | |
425 struct task *tk; | |
426 char buf[BUFSIZ]; | |
427 ssize_t nr; | |
428 | |
429 for (size_t i = 0; i < fds->listsz; ++i) { | |
430 if (fds->list[i].revents == 0) | |
431 continue; | |
432 if (!(tk = find_by_fd(fds->list[i].fd))) | |
433 continue; | |
434 | |
435 /* Read stdout/stderr from children pipe. */ | |
436 if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0) | |
437 tk->status = TASKST_SYNCING; | |
438 else { | |
439 buf[nr] = 0; | |
440 strlcat(tk->out, buf, sizeof (tk->out)); | |
441 } | |
442 } | |
443 } | |
444 | |
445 /* | |
446 * Retrieve status code from spawned process complete or upload again if they | |
447 * failed to sync. | |
448 */ | |
449 static void | |
450 flushall(void) | |
451 { | |
452 struct task *tk, *tmp; | |
453 | |
454 TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) | |
455 if (tk->status == TASKST_SYNCING) | |
456 upload(tk); | |
457 } | |
458 | |
459 static int | |
460 extract(struct task *tk, json_t *doc) | |
461 { | |
462 struct project proj; | |
463 size_t len; | |
464 | |
465 if (project_from(&proj, 1, doc) < 0) { | |
466 json_decref(doc); | |
467 log_warn("fetchproject: %s", strerror(errno)); | |
468 return -1; | |
469 } | |
470 | |
471 len = strlen(proj.script); | |
472 | |
473 if ((size_t)write(tk->scriptfd, proj.script, len) != len) { | |
474 json_decref(doc); | |
475 log_warn("fetchproject: %s", strerror(errno)); | |
476 return -1; | |
477 } | |
478 | |
479 /* Close so we can finally spawn it. */ | |
480 close(tk->scriptfd); | |
481 tk->scriptfd = 0; | |
482 | |
483 return 0; | |
484 } | |
485 | |
486 static int | |
487 fetchproject(struct task *tk) | |
488 { | |
489 json_t *doc; | |
490 | |
491 if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id)))) | |
492 return -1; | |
493 | |
494 return extract(tk, doc); | |
495 } | |
496 | |
497 /* | |
498 * Create a task to run the script. This will retrieve the project script code | |
499 * at this moment and put it in a temporary file. | |
500 */ | |
501 static void | |
502 createtask(struct task *tk) | |
503 { | |
504 if (tk->status != TASKST_PENDING) | |
505 return; | |
506 | |
507 log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag); | |
508 snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id); | |
509 | |
510 if ((tk->scriptfd = mkstemp(tk->script)) < 0 || | |
511 fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) { | |
512 unlink(tk->script); | |
513 log_warn("%s", strerror(errno)); | |
514 return; | |
515 } | |
516 | |
517 if (fetchproject(tk) < 0) { | |
518 unlink(tk->script); | |
519 close(tk->scriptfd); | |
520 tk->scriptfd = 0; | |
521 } else | |
522 spawn(tk); | |
523 } | |
524 | |
525 /* | |
526 * Start all pending tasks if the limit of running tasks is not reached. | |
527 */ | |
528 static void | |
529 startall(void) | |
530 { | |
531 size_t nrunning = 0; | |
532 struct task *tk; | |
533 | |
534 TAILQ_FOREACH(tk, &tasks, link) | |
535 if (tk->status == TASKST_RUNNING) | |
536 ++nrunning; | |
537 | |
538 if (nrunning >= (size_t)config.maxbuilds) | |
539 log_debug("not spawning new process because limit is reached"); | |
540 else { | |
541 tk = TAILQ_FIRST(&tasks); | |
542 | |
543 while (tk && nrunning++ < (size_t)config.maxbuilds) { | |
544 createtask(tk); | |
545 tk = TAILQ_NEXT(tk, link); | |
546 } | |
547 } | |
548 } | |
549 | |
550 static void | |
551 fetchworker(void) | |
552 { | |
553 json_t *doc; | |
554 | |
555 if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) || | |
556 worker_from(&worker, 1, doc) < 0) | |
557 errx(1, "unable to retrieve worker id"); | |
558 | |
559 log_info("worker id: %d", worker.id); | |
560 log_info("worker name: %s", worker.name); | |
561 log_info("worker description: %s", worker.desc); | |
562 | |
563 json_decref(doc); | |
564 } | |
565 | |
231 static void | 566 static void |
232 init(void) | 567 init(void) |
233 { | 568 { |
234 struct sigaction sa; | 569 struct sigaction sa; |
235 | 570 |
239 | 574 |
240 if (sigaction(SIGCHLD, &sa, NULL) < 0) | 575 if (sigaction(SIGCHLD, &sa, NULL) < 0) |
241 err(1, "sigaction"); | 576 err(1, "sigaction"); |
242 | 577 |
243 log_open("sciworkerd"); | 578 log_open("sciworkerd"); |
579 fetchworker(); | |
244 | 580 |
245 #if 0 | 581 #if 0 |
246 if (pipe(sigpipe) < 0) | 582 if (pipe(sigpipe) < 0) |
247 err(1, "pipe"); | 583 err(1, "pipe"); |
248 if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 || | 584 if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 || |
266 | 602 |
267 #if 0 | 603 #if 0 |
268 fds.list[0].fd = sigpipe[0]; | 604 fds.list[0].fd = sigpipe[0]; |
269 fds.list[0].events = POLLIN; | 605 fds.list[0].events = POLLIN; |
270 #endif | 606 #endif |
271 printf("fd => %zu\n", fds.listsz); | |
272 | 607 |
273 TAILQ_FOREACH(tk, &tasks, link) { | 608 TAILQ_FOREACH(tk, &tasks, link) { |
274 if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { | 609 if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { |
275 printf("adding %d to pollin\n", tk->pipe[0]); | |
276 fds.list[i].fd = tk->pipe[0]; | 610 fds.list[i].fd = tk->pipe[0]; |
277 fds.list[i++].events = POLLIN | POLLPRI; | 611 fds.list[i++].events = POLLIN | POLLPRI; |
278 } | 612 } |
279 } | 613 } |
280 | 614 |
281 return fds; | 615 return fds; |
282 } | |
283 | |
284 static const char * | |
285 uploadenc(const struct task *tk) | |
286 { | |
287 static char json[SCI_MSG_MAX]; | |
288 json_t *object; | |
289 | |
290 object = json_object(); | |
291 json_object_set(object, "code", json_string(tk->out)); | |
292 json_object_set(object, "id", json_integer(tk->job.id)); | |
293 json_object_set(object, "retcode", json_integer(tk->retcode)); | |
294 strlcpy(json, json_dumps(object, JSON_COMPACT), sizeof (json)); | |
295 json_decref(object); | |
296 | |
297 return json; | |
298 } | |
299 | |
300 static size_t | |
301 getcb(char *in, size_t n, size_t w, FILE *fp) | |
302 { | |
303 if (fwrite(in, n, w, fp) != w) | |
304 return log_warn("get: %s", strerror(errno)), 0; | |
305 | |
306 return w; | |
307 } | |
308 | |
309 static const char * | |
310 get(const char *topic, const char *url) | |
311 { | |
312 CURL *curl; | |
313 CURLcode code; | |
314 static char buf[SCI_MSG_MAX]; | |
315 long status; | |
316 FILE *fp; | |
317 | |
318 curl = curl_easy_init(); | |
319 | |
320 if (!(fp = fmemopen(buf, sizeof (buf), "w"))) | |
321 err(1, "fmemopen"); | |
322 | |
323 #if 0 | |
324 curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/script/%s", tk->job.project.name)); | |
325 #endif | |
326 curl_easy_setopt(curl, CURLOPT_URL, url); | |
327 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); | |
328 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); | |
329 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb); | |
330 curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp); | |
331 | |
332 if ((code = curl_easy_perform(curl)) != CURLE_OK) | |
333 log_warn("%s: %s", topic, curl_easy_strerror(code)); | |
334 | |
335 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); | |
336 curl_easy_cleanup(curl); | |
337 | |
338 fclose(fp); | |
339 | |
340 if (code != CURLE_OK) | |
341 return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL; | |
342 if (status != 200) | |
343 return log_warn("%s: unexpected status code %ld", topic, status), NULL; | |
344 | |
345 return buf; | |
346 } | |
347 | |
348 static size_t | |
349 silent(char *in, size_t n, size_t w, void *data) | |
350 { | |
351 (void)in; | |
352 (void)n; | |
353 (void)data; | |
354 | |
355 return w; | |
356 } | |
357 | |
358 static void | |
359 upload(struct task *tk) | |
360 { | |
361 CURL *curl; | |
362 CURLcode code; | |
363 long status; | |
364 | |
365 curl = curl_easy_init(); | |
366 curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs/%s", config.worker)); | |
367 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); | |
368 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); | |
369 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent); | |
370 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, uploadenc(tk)); | |
371 code = curl_easy_perform(curl); | |
372 | |
373 /* | |
374 * If we fail to upload data, we put the result into syncing mode so | |
375 * that we retry later without redoing the job over and over | |
376 */ | |
377 tk->status = TASKST_SYNCING; | |
378 | |
379 if (code != CURLE_OK) | |
380 log_warn("upload: %s", curl_easy_strerror(code)); | |
381 else { | |
382 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); | |
383 | |
384 if (status != 200) | |
385 log_warn("upload: unexpected return code: %ld", status); | |
386 else | |
387 destroy(tk); | |
388 } | |
389 | |
390 curl_easy_cleanup(curl); | |
391 } | |
392 | |
393 static inline void | |
394 finished(struct task *tk) | |
395 { | |
396 log_info("task %d: completed with exit code %d", tk->child, tk->retcode); | |
397 printf("== OUTPUT ==\n"); | |
398 puts(tk->out); | |
399 upload(tk); | |
400 } | |
401 | |
402 static inline int | |
403 pending(int64_t id) | |
404 { | |
405 struct task *t; | |
406 | |
407 TAILQ_FOREACH(t, &tasks, link) | |
408 if (t->job.id == id) | |
409 return 1; | |
410 | |
411 return 0; | |
412 } | |
413 | |
414 static void | |
415 push(int64_t id, const char *tag, const char *project) | |
416 { | |
417 struct task *tk; | |
418 | |
419 log_info("queued job build (%lld) for project %s, tag %s\n", id, project, tag); | |
420 | |
421 tk = util_calloc(1, sizeof (*tk)); | |
422 tk->job.id = id; | |
423 strlcpy(tk->job.tag, tag, sizeof (tk->job.tag)); | |
424 strlcpy(tk->job.project.name, project, sizeof (tk->job.project.name)); | |
425 | |
426 TAILQ_INSERT_TAIL(&tasks, tk, link); | |
427 } | |
428 | |
429 static void | |
430 merge(const char *str) | |
431 { | |
432 json_t *array, *obj, *id, *tag, *project; | |
433 json_error_t err; | |
434 size_t i; | |
435 | |
436 if (!(array = json_loads(str, 0, &err))) { | |
437 log_warn("fetch: failed to decode JSON: %s", err.text); | |
438 return; | |
439 } | |
440 if (!json_is_array(array)) | |
441 goto invalid; | |
442 | |
443 json_array_foreach(array, i, obj) { | |
444 if (!json_is_object(obj) || | |
445 !json_is_integer((id = json_object_get(obj, "id"))) || | |
446 !json_is_string((tag = json_object_get(obj, "tag"))) || | |
447 !json_is_string((project = json_object_get(obj, "project")))) | |
448 goto invalid; | |
449 | |
450 if (!pending(json_integer_value(id))) | |
451 push(json_integer_value(id), json_string_value(tag), | |
452 json_string_value(project)); | |
453 } | |
454 | |
455 json_decref(array); | |
456 | |
457 return; | |
458 | |
459 invalid: | |
460 log_warn("fetch: invalid JSON input"); | |
461 json_decref(array); | |
462 } | |
463 | |
464 static void | |
465 fetchjobs(void) | |
466 { | |
467 const char *json; | |
468 | |
469 if (!(json = get("fetch", makeurl("api/v1/jobs/%s", config.worker)))) | |
470 log_warn("unable to retrieve jobs"); | |
471 else | |
472 merge(json); | |
473 } | |
474 | |
475 /* | |
476 * This function reads stdout/stderr pipe from child and optionally remove them | |
477 * if they have completed. | |
478 */ | |
479 static void | |
480 readall(struct fds *fds) | |
481 { | |
482 struct task *tk; | |
483 char buf[BUFSIZ]; | |
484 ssize_t nr; | |
485 | |
486 for (size_t i = 0; i < fds->listsz; ++i) { | |
487 if (fds->list[i].revents == 0) | |
488 continue; | |
489 if (!(tk = find_by_fd(fds->list[i].fd))) | |
490 continue; | |
491 | |
492 /* Read stdout/stderr from children pipe. */ | |
493 if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0) | |
494 tk->status = TASKST_SYNCING; | |
495 else { | |
496 buf[nr] = 0; | |
497 strlcat(tk->out, buf, sizeof (tk->out)); | |
498 } | |
499 } | |
500 } | |
501 | |
502 /* | |
503 * Retrieve status code from spawned process complete or upload again if they | |
504 * failed to sync. | |
505 */ | |
506 static void | |
507 flushall(void) | |
508 { | |
509 struct task *tk, *tmp; | |
510 | |
511 TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) | |
512 if (tk->status == TASKST_SYNCING) | |
513 upload(tk); | |
514 } | |
515 | |
516 static int | |
517 extract(struct task *tk, const char *json) | |
518 { | |
519 json_t *doc, *code; | |
520 json_error_t err; | |
521 size_t len; | |
522 | |
523 if (!(doc = json_loads(json, 0, &err))) { | |
524 log_warn("fetchscript: failed to decode JSON: %s", err.text); | |
525 return -1; | |
526 } | |
527 if (!json_is_object(doc) || | |
528 !json_is_string((code = json_object_get(doc, "code")))) | |
529 goto invalid; | |
530 | |
531 len = strlen(json_string_value(code)); | |
532 | |
533 if ((size_t)write(tk->scriptfd, json_string_value(code), len) != len) { | |
534 log_warn("fetchscript: %s", strerror(errno)); | |
535 json_decref(doc); | |
536 | |
537 return -1; | |
538 } | |
539 | |
540 /* Close so we can finally spawn it. */ | |
541 close(tk->scriptfd); | |
542 tk->scriptfd = 0; | |
543 | |
544 return 0; | |
545 | |
546 invalid: | |
547 log_warn("fetchscript: invalid JSON"); | |
548 json_decref(doc); | |
549 | |
550 return -1; | |
551 } | |
552 | |
553 static int | |
554 fetchscript(struct task *tk) | |
555 { | |
556 const char *json; | |
557 | |
558 if (!(json = get("fetchscript", makeurl("api/v1/script/%s", tk->job.project.name)))) | |
559 return -1; | |
560 | |
561 return extract(tk, json); | |
562 } | |
563 | |
564 static void | |
565 createtask(struct task *tk) | |
566 { | |
567 if (tk->status != TASKST_PENDING) | |
568 return; | |
569 | |
570 log_debug("creating task (id=%lld, project=%s, tag=%s)", | |
571 tk->job.id, tk->job.project.name, tk->job.tag); | |
572 | |
573 snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%s-XXXXXX", | |
574 tk->job.project.name); | |
575 | |
576 if ((tk->scriptfd = mkstemp(tk->script)) < 0 || | |
577 fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) { | |
578 unlink(tk->script); | |
579 log_warn("%s", strerror(errno)); | |
580 return; | |
581 } | |
582 | |
583 if (fetchscript(tk) < 0) { | |
584 unlink(tk->script); | |
585 close(tk->scriptfd); | |
586 tk->scriptfd = 0; | |
587 } else | |
588 spawn(tk); | |
589 } | |
590 | |
591 static void | |
592 startall(void) | |
593 { | |
594 size_t nrunning = 0; | |
595 struct task *tk; | |
596 | |
597 TAILQ_FOREACH(tk, &tasks, link) | |
598 if (tk->status == TASKST_RUNNING) | |
599 ++nrunning; | |
600 | |
601 if (nrunning >= (size_t)config.maxbuilds) { | |
602 log_debug("not spawning new process because limit is reached"); | |
603 } else { | |
604 tk = TAILQ_FIRST(&tasks); | |
605 | |
606 while (tk && nrunning++ < (size_t)config.maxbuilds) { | |
607 createtask(tk); | |
608 tk = TAILQ_NEXT(tk, link); | |
609 } | |
610 } | |
611 } | 616 } |
612 | 617 |
613 static void | 618 static void |
614 run(void) | 619 run(void) |
615 { | 620 { |