comparison sciworkerd.c @ 3:215c0c3b3609

misc: use JSON everywhere (scictl/sciwebd)
author David Demelier <markand@malikania.fr>
date Mon, 14 Jun 2021 22:08:24 +0200
parents 5fa3d2f479b2
children eb76429ce112
comparison
equal deleted inserted replaced
2:5fa3d2f479b2 3:215c0c3b3609
16 16
17 #include <curl/curl.h> 17 #include <curl/curl.h>
18 #include <jansson.h> 18 #include <jansson.h>
19 19
20 #include "config.h" 20 #include "config.h"
21 #include "job.h"
22 #include "log.h" 21 #include "log.h"
23 #include "project.h" 22 #include "types.h"
24 #include "util.h" 23 #include "util.h"
24
25 #define TAG_MAX 256
25 26
26 enum taskst { 27 enum taskst {
27 TASKST_PENDING, /* not started yet. */ 28 TASKST_PENDING, /* not started yet. */
28 TASKST_RUNNING, /* currently running. */ 29 TASKST_RUNNING, /* currently running. */
29 TASKST_COMPLETED, /* completed but not synced yet. */ 30 TASKST_COMPLETED, /* completed but not synced yet. */
32 33
33 struct task { 34 struct task {
34 enum taskst status; 35 enum taskst status;
35 pid_t child; 36 pid_t child;
36 int pipe[2]; 37 int pipe[2];
37 int retcode; 38 int exitcode;
38 struct job job; 39 int job_id;
40 int project_id;
41 char job_tag[TAG_MAX];
39 char out[SCI_CONSOLE_MAX]; 42 char out[SCI_CONSOLE_MAX];
40 char script[PATH_MAX]; 43 char script[PATH_MAX];
41 int scriptfd; 44 int scriptfd;
42 TAILQ_ENTRY(task) link; 45 TAILQ_ENTRY(task) link;
43 }; 46 };
68 .worker = "default", 71 .worker = "default",
69 .maxbuilds = 4 72 .maxbuilds = 4
70 }; 73 };
71 74
72 static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); 75 static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks);
76 static struct worker worker;
73 77
74 #if 0 78 #if 0
75 static int sigpipe[2]; 79 static int sigpipe[2];
76 #endif 80 #endif
77 81
107 } 111 }
108 112
109 static void 113 static void
110 destroy(struct task *tk) 114 destroy(struct task *tk)
111 { 115 {
112 log_debug("destroying task %lld", tk->job.id); 116 log_debug("destroying task %d", tk->job_id);
113 117
114 if (tk->pipe[0]) 118 if (tk->pipe[0])
115 close(tk->pipe[0]); 119 close(tk->pipe[0]);
116 if (tk->pipe[1]) 120 if (tk->pipe[1])
117 close(tk->pipe[1]); 121 close(tk->pipe[1]);
139 /* Child. */ 143 /* Child. */
140 dup2(tk->pipe[1], STDOUT_FILENO); 144 dup2(tk->pipe[1], STDOUT_FILENO);
141 dup2(tk->pipe[1], STDERR_FILENO); 145 dup2(tk->pipe[1], STDERR_FILENO);
142 close(tk->pipe[0]); 146 close(tk->pipe[0]);
143 close(tk->pipe[1]); 147 close(tk->pipe[1]);
144 log_debug("spawn: running process (%lld) %s", tk->child, tk->script); 148 log_debug("spawn: running process (%lld) %s",
149 (long long int)tk->child, tk->script);
145 150
146 tk->status = TASKST_RUNNING; 151 tk->status = TASKST_RUNNING;
147 152
148 if (execl(tk->script, tk->script, tk->job.tag, NULL) < 0) { 153 if (execl(tk->script, tk->script, tk->job_tag, NULL) < 0) {
149 tk->status = TASKST_PENDING; 154 tk->status = TASKST_PENDING;
150 log_warn("exec %s: %s", tk->script, strerror(errno)); 155 log_warn("exec %s: %s", tk->script, strerror(errno));
151 exit(0); 156 exit(0);
152 } 157 }
153 break; 158 break;
211 216
212 if ((tk = find_by_pid(sinfo->si_pid))) { 217 if ((tk = find_by_pid(sinfo->si_pid))) {
213 log_debug("process %lld completed", (long long int)sinfo->si_pid); 218 log_debug("process %lld completed", (long long int)sinfo->si_pid);
214 close(tk->pipe[1]); 219 close(tk->pipe[1]);
215 tk->status = TASKST_COMPLETED; 220 tk->status = TASKST_COMPLETED;
216 tk->retcode = status; 221 tk->exitcode = status;
217 tk->pipe[1] = 0; 222 tk->pipe[1] = 0;
218 } 223 }
219 224
220 #if 0 225 #if 0
221 /* 226 /*
226 if (write(sigpipe[1], &r, sizeof (r)) < 0) 231 if (write(sigpipe[1], &r, sizeof (r)) < 0)
227 err(1, "write"); 232 err(1, "write");
228 #endif 233 #endif
229 } 234 }
230 235
236 static const char *
237 uploadenc(const struct task *tk)
238 {
239 json_t *doc;
240
241 struct jobresult res = {0};
242 char *dump;
243
244 res.job_id = tk->job_id;
245 res.exitcode = tk->exitcode;
246 res.log = tk->out;
247 res.worker_id = worker.id;
248
249 doc = jobresult_to(&res, 1);
250 dump = json_dumps(doc, JSON_COMPACT);
251
252 json_decref(doc);
253
254 return dump;
255 }
256
257 static size_t
258 getcb(char *in, size_t n, size_t w, FILE *fp)
259 {
260 if (fwrite(in, n, w, fp) != w)
261 return log_warn("get: %s", strerror(errno)), 0;
262
263 return w;
264 }
265
266 static json_t *
267 get(const char *topic, const char *url)
268 {
269 CURL *curl;
270 CURLcode code;
271
272 json_t *doc;
273 json_error_t error;
274
275 char buf[SCI_MSG_MAX];
276 long status;
277 FILE *fp;
278
279 curl = curl_easy_init();
280
281 if (!(fp = fmemopen(buf, sizeof (buf), "w")))
282 err(1, "fmemopen");
283
284 curl_easy_setopt(curl, CURLOPT_URL, url);
285 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
286 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
287 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb);
288 curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp);
289
290 if ((code = curl_easy_perform(curl)) != CURLE_OK)
291 log_warn("%s: %s", topic, curl_easy_strerror(code));
292
293 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
294 curl_easy_cleanup(curl);
295
296 fclose(fp);
297
298 if (code != CURLE_OK)
299 return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL;
300 if (status != 200)
301 return log_warn("%s: unexpected status code %ld", topic, status), NULL;
302 if (!(doc = json_loads(buf, 0, &error)))
303 return log_warn("%s: %s", topic, error.text), NULL;
304
305 return doc;
306 }
307
308 static size_t
309 silent(char *in, size_t n, size_t w, void *data)
310 {
311 (void)in;
312 (void)n;
313 (void)data;
314
315 return w;
316 }
317
318 static void
319 upload(struct task *tk)
320 {
321 CURL *curl;
322 CURLcode code;
323 long status;
324
325 curl = curl_easy_init();
326 curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs"));
327 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
328 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
329 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
330 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, uploadenc(tk));
331 code = curl_easy_perform(curl);
332
333 /*
334 * If we fail to upload data, we put the result into syncing mode so
335 * that we retry later without redoing the job over and over
336 */
337 tk->status = TASKST_SYNCING;
338
339 if (code != CURLE_OK)
340 log_warn("upload: %s", curl_easy_strerror(code));
341 else {
342 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
343
344 if (status != 200)
345 log_warn("upload: unexpected return code: %ld", status);
346 else
347 destroy(tk);
348 }
349
350 curl_easy_cleanup(curl);
351 }
352
353 static inline void
354 finished(struct task *tk)
355 {
356 log_info("task %d: completed with exit code %d", tk->child, tk->exitcode);
357 printf("== OUTPUT ==\n");
358 puts(tk->out);
359 upload(tk);
360 }
361
362 static inline int
363 pending(int id)
364 {
365 struct task *t;
366
367 TAILQ_FOREACH(t, &tasks, link)
368 if (t->job_id == id)
369 return 1;
370
371 return 0;
372 }
373
374 static void
375 queue(int id, int project_id, const char *tag)
376 {
377 struct task *tk;
378
379 log_info("queued job build (%d) for tag %s\n", id, tag);
380
381 tk = util_calloc(1, sizeof (*tk));
382 tk->job_id = id;
383 tk->project_id = project_id;
384 strlcpy(tk->job_tag, tag, sizeof (tk->job_tag));
385
386 TAILQ_INSERT_TAIL(&tasks, tk, link);
387 }
388
389 static void
390 merge(json_t *doc)
391 {
392 struct job jobs[SCI_JOB_LIST_MAX];
393 ssize_t jobsz;
394
395 if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
396 log_warn("fetchjobs: %s", strerror(errno));
397 else {
398 for (ssize_t i = 0; i < jobsz; ++i) {
399 if (!pending(jobs[i].id))
400 queue(jobs[i].id, jobs[i].project_id, jobs[i].tag);
401 }
402 }
403
404 json_decref(doc);
405 }
406
407 static void
408 fetchjobs(void)
409 {
410 json_t *doc;
411
412 if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
413 log_warn("unable to retrieve jobs");
414 else
415 merge(doc);
416 }
417
418 /*
419 * This function reads stdout/stderr pipe from child and optionally remove them
420 * if they have completed.
421 */
422 static void
423 readall(struct fds *fds)
424 {
425 struct task *tk;
426 char buf[BUFSIZ];
427 ssize_t nr;
428
429 for (size_t i = 0; i < fds->listsz; ++i) {
430 if (fds->list[i].revents == 0)
431 continue;
432 if (!(tk = find_by_fd(fds->list[i].fd)))
433 continue;
434
435 /* Read stdout/stderr from children pipe. */
436 if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0)
437 tk->status = TASKST_SYNCING;
438 else {
439 buf[nr] = 0;
440 strlcat(tk->out, buf, sizeof (tk->out));
441 }
442 }
443 }
444
445 /*
446 * Retrieve status code from spawned process complete or upload again if they
447 * failed to sync.
448 */
449 static void
450 flushall(void)
451 {
452 struct task *tk, *tmp;
453
454 TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp)
455 if (tk->status == TASKST_SYNCING)
456 upload(tk);
457 }
458
459 static int
460 extract(struct task *tk, json_t *doc)
461 {
462 struct project proj;
463 size_t len;
464
465 if (project_from(&proj, 1, doc) < 0) {
466 json_decref(doc);
467 log_warn("fetchproject: %s", strerror(errno));
468 return -1;
469 }
470
471 len = strlen(proj.script);
472
473 if ((size_t)write(tk->scriptfd, proj.script, len) != len) {
474 json_decref(doc);
475 log_warn("fetchproject: %s", strerror(errno));
476 return -1;
477 }
478
479 /* Close so we can finally spawn it. */
480 close(tk->scriptfd);
481 tk->scriptfd = 0;
482
483 return 0;
484 }
485
486 static int
487 fetchproject(struct task *tk)
488 {
489 json_t *doc;
490
491 if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id))))
492 return -1;
493
494 return extract(tk, doc);
495 }
496
497 /*
498 * Create a task to run the script. This will retrieve the project script code
499 * at this moment and put it in a temporary file.
500 */
501 static void
502 createtask(struct task *tk)
503 {
504 if (tk->status != TASKST_PENDING)
505 return;
506
507 log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag);
508 snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id);
509
510 if ((tk->scriptfd = mkstemp(tk->script)) < 0 ||
511 fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) {
512 unlink(tk->script);
513 log_warn("%s", strerror(errno));
514 return;
515 }
516
517 if (fetchproject(tk) < 0) {
518 unlink(tk->script);
519 close(tk->scriptfd);
520 tk->scriptfd = 0;
521 } else
522 spawn(tk);
523 }
524
525 /*
526 * Start all pending tasks if the limit of running tasks is not reached.
527 */
528 static void
529 startall(void)
530 {
531 size_t nrunning = 0;
532 struct task *tk;
533
534 TAILQ_FOREACH(tk, &tasks, link)
535 if (tk->status == TASKST_RUNNING)
536 ++nrunning;
537
538 if (nrunning >= (size_t)config.maxbuilds)
539 log_debug("not spawning new process because limit is reached");
540 else {
541 tk = TAILQ_FIRST(&tasks);
542
543 while (tk && nrunning++ < (size_t)config.maxbuilds) {
544 createtask(tk);
545 tk = TAILQ_NEXT(tk, link);
546 }
547 }
548 }
549
550 static void
551 fetchworker(void)
552 {
553 json_t *doc;
554
555 if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) ||
556 worker_from(&worker, 1, doc) < 0)
557 errx(1, "unable to retrieve worker id");
558
559 log_info("worker id: %d", worker.id);
560 log_info("worker name: %s", worker.name);
561 log_info("worker description: %s", worker.desc);
562
563 json_decref(doc);
564 }
565
231 static void 566 static void
232 init(void) 567 init(void)
233 { 568 {
234 struct sigaction sa; 569 struct sigaction sa;
235 570
239 574
240 if (sigaction(SIGCHLD, &sa, NULL) < 0) 575 if (sigaction(SIGCHLD, &sa, NULL) < 0)
241 err(1, "sigaction"); 576 err(1, "sigaction");
242 577
243 log_open("sciworkerd"); 578 log_open("sciworkerd");
579 fetchworker();
244 580
245 #if 0 581 #if 0
246 if (pipe(sigpipe) < 0) 582 if (pipe(sigpipe) < 0)
247 err(1, "pipe"); 583 err(1, "pipe");
248 if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 || 584 if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 ||
266 602
267 #if 0 603 #if 0
268 fds.list[0].fd = sigpipe[0]; 604 fds.list[0].fd = sigpipe[0];
269 fds.list[0].events = POLLIN; 605 fds.list[0].events = POLLIN;
270 #endif 606 #endif
271 printf("fd => %zu\n", fds.listsz);
272 607
273 TAILQ_FOREACH(tk, &tasks, link) { 608 TAILQ_FOREACH(tk, &tasks, link) {
274 if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { 609 if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
275 printf("adding %d to pollin\n", tk->pipe[0]);
276 fds.list[i].fd = tk->pipe[0]; 610 fds.list[i].fd = tk->pipe[0];
277 fds.list[i++].events = POLLIN | POLLPRI; 611 fds.list[i++].events = POLLIN | POLLPRI;
278 } 612 }
279 } 613 }
280 614
281 return fds; 615 return fds;
282 }
283
284 static const char *
285 uploadenc(const struct task *tk)
286 {
287 static char json[SCI_MSG_MAX];
288 json_t *object;
289
290 object = json_object();
291 json_object_set(object, "code", json_string(tk->out));
292 json_object_set(object, "id", json_integer(tk->job.id));
293 json_object_set(object, "retcode", json_integer(tk->retcode));
294 strlcpy(json, json_dumps(object, JSON_COMPACT), sizeof (json));
295 json_decref(object);
296
297 return json;
298 }
299
300 static size_t
301 getcb(char *in, size_t n, size_t w, FILE *fp)
302 {
303 if (fwrite(in, n, w, fp) != w)
304 return log_warn("get: %s", strerror(errno)), 0;
305
306 return w;
307 }
308
309 static const char *
310 get(const char *topic, const char *url)
311 {
312 CURL *curl;
313 CURLcode code;
314 static char buf[SCI_MSG_MAX];
315 long status;
316 FILE *fp;
317
318 curl = curl_easy_init();
319
320 if (!(fp = fmemopen(buf, sizeof (buf), "w")))
321 err(1, "fmemopen");
322
323 #if 0
324 curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/script/%s", tk->job.project.name));
325 #endif
326 curl_easy_setopt(curl, CURLOPT_URL, url);
327 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
328 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
329 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb);
330 curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp);
331
332 if ((code = curl_easy_perform(curl)) != CURLE_OK)
333 log_warn("%s: %s", topic, curl_easy_strerror(code));
334
335 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
336 curl_easy_cleanup(curl);
337
338 fclose(fp);
339
340 if (code != CURLE_OK)
341 return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL;
342 if (status != 200)
343 return log_warn("%s: unexpected status code %ld", topic, status), NULL;
344
345 return buf;
346 }
347
348 static size_t
349 silent(char *in, size_t n, size_t w, void *data)
350 {
351 (void)in;
352 (void)n;
353 (void)data;
354
355 return w;
356 }
357
358 static void
359 upload(struct task *tk)
360 {
361 CURL *curl;
362 CURLcode code;
363 long status;
364
365 curl = curl_easy_init();
366 curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs/%s", config.worker));
367 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
368 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
369 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
370 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, uploadenc(tk));
371 code = curl_easy_perform(curl);
372
373 /*
374 * If we fail to upload data, we put the result into syncing mode so
375 * that we retry later without redoing the job over and over
376 */
377 tk->status = TASKST_SYNCING;
378
379 if (code != CURLE_OK)
380 log_warn("upload: %s", curl_easy_strerror(code));
381 else {
382 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
383
384 if (status != 200)
385 log_warn("upload: unexpected return code: %ld", status);
386 else
387 destroy(tk);
388 }
389
390 curl_easy_cleanup(curl);
391 }
392
393 static inline void
394 finished(struct task *tk)
395 {
396 log_info("task %d: completed with exit code %d", tk->child, tk->retcode);
397 printf("== OUTPUT ==\n");
398 puts(tk->out);
399 upload(tk);
400 }
401
402 static inline int
403 pending(int64_t id)
404 {
405 struct task *t;
406
407 TAILQ_FOREACH(t, &tasks, link)
408 if (t->job.id == id)
409 return 1;
410
411 return 0;
412 }
413
414 static void
415 push(int64_t id, const char *tag, const char *project)
416 {
417 struct task *tk;
418
419 log_info("queued job build (%lld) for project %s, tag %s\n", id, project, tag);
420
421 tk = util_calloc(1, sizeof (*tk));
422 tk->job.id = id;
423 strlcpy(tk->job.tag, tag, sizeof (tk->job.tag));
424 strlcpy(tk->job.project.name, project, sizeof (tk->job.project.name));
425
426 TAILQ_INSERT_TAIL(&tasks, tk, link);
427 }
428
429 static void
430 merge(const char *str)
431 {
432 json_t *array, *obj, *id, *tag, *project;
433 json_error_t err;
434 size_t i;
435
436 if (!(array = json_loads(str, 0, &err))) {
437 log_warn("fetch: failed to decode JSON: %s", err.text);
438 return;
439 }
440 if (!json_is_array(array))
441 goto invalid;
442
443 json_array_foreach(array, i, obj) {
444 if (!json_is_object(obj) ||
445 !json_is_integer((id = json_object_get(obj, "id"))) ||
446 !json_is_string((tag = json_object_get(obj, "tag"))) ||
447 !json_is_string((project = json_object_get(obj, "project"))))
448 goto invalid;
449
450 if (!pending(json_integer_value(id)))
451 push(json_integer_value(id), json_string_value(tag),
452 json_string_value(project));
453 }
454
455 json_decref(array);
456
457 return;
458
459 invalid:
460 log_warn("fetch: invalid JSON input");
461 json_decref(array);
462 }
463
464 static void
465 fetchjobs(void)
466 {
467 const char *json;
468
469 if (!(json = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
470 log_warn("unable to retrieve jobs");
471 else
472 merge(json);
473 }
474
475 /*
476 * This function reads stdout/stderr pipe from child and optionally remove them
477 * if they have completed.
478 */
479 static void
480 readall(struct fds *fds)
481 {
482 struct task *tk;
483 char buf[BUFSIZ];
484 ssize_t nr;
485
486 for (size_t i = 0; i < fds->listsz; ++i) {
487 if (fds->list[i].revents == 0)
488 continue;
489 if (!(tk = find_by_fd(fds->list[i].fd)))
490 continue;
491
492 /* Read stdout/stderr from children pipe. */
493 if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0)
494 tk->status = TASKST_SYNCING;
495 else {
496 buf[nr] = 0;
497 strlcat(tk->out, buf, sizeof (tk->out));
498 }
499 }
500 }
501
502 /*
503 * Retrieve status code from spawned process complete or upload again if they
504 * failed to sync.
505 */
506 static void
507 flushall(void)
508 {
509 struct task *tk, *tmp;
510
511 TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp)
512 if (tk->status == TASKST_SYNCING)
513 upload(tk);
514 }
515
516 static int
517 extract(struct task *tk, const char *json)
518 {
519 json_t *doc, *code;
520 json_error_t err;
521 size_t len;
522
523 if (!(doc = json_loads(json, 0, &err))) {
524 log_warn("fetchscript: failed to decode JSON: %s", err.text);
525 return -1;
526 }
527 if (!json_is_object(doc) ||
528 !json_is_string((code = json_object_get(doc, "code"))))
529 goto invalid;
530
531 len = strlen(json_string_value(code));
532
533 if ((size_t)write(tk->scriptfd, json_string_value(code), len) != len) {
534 log_warn("fetchscript: %s", strerror(errno));
535 json_decref(doc);
536
537 return -1;
538 }
539
540 /* Close so we can finally spawn it. */
541 close(tk->scriptfd);
542 tk->scriptfd = 0;
543
544 return 0;
545
546 invalid:
547 log_warn("fetchscript: invalid JSON");
548 json_decref(doc);
549
550 return -1;
551 }
552
553 static int
554 fetchscript(struct task *tk)
555 {
556 const char *json;
557
558 if (!(json = get("fetchscript", makeurl("api/v1/script/%s", tk->job.project.name))))
559 return -1;
560
561 return extract(tk, json);
562 }
563
564 static void
565 createtask(struct task *tk)
566 {
567 if (tk->status != TASKST_PENDING)
568 return;
569
570 log_debug("creating task (id=%lld, project=%s, tag=%s)",
571 tk->job.id, tk->job.project.name, tk->job.tag);
572
573 snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%s-XXXXXX",
574 tk->job.project.name);
575
576 if ((tk->scriptfd = mkstemp(tk->script)) < 0 ||
577 fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) {
578 unlink(tk->script);
579 log_warn("%s", strerror(errno));
580 return;
581 }
582
583 if (fetchscript(tk) < 0) {
584 unlink(tk->script);
585 close(tk->scriptfd);
586 tk->scriptfd = 0;
587 } else
588 spawn(tk);
589 }
590
591 static void
592 startall(void)
593 {
594 size_t nrunning = 0;
595 struct task *tk;
596
597 TAILQ_FOREACH(tk, &tasks, link)
598 if (tk->status == TASKST_RUNNING)
599 ++nrunning;
600
601 if (nrunning >= (size_t)config.maxbuilds) {
602 log_debug("not spawning new process because limit is reached");
603 } else {
604 tk = TAILQ_FIRST(&tasks);
605
606 while (tk && nrunning++ < (size_t)config.maxbuilds) {
607 createtask(tk);
608 tk = TAILQ_NEXT(tk, link);
609 }
610 }
611 } 616 }
612 617
613 static void 618 static void
614 run(void) 619 run(void)
615 { 620 {