Mercurial > sci
diff sciworkerd/sciworkerd.c @ 24:34cbbd215ef7
misc: add basic support for jobresults
author | David Demelier <markand@malikania.fr> |
---|---|
date | Mon, 25 Jul 2022 21:11:23 +0200 |
parents | 2cb228f23f53 |
children | dae2de19ca5d |
line wrap: on
line diff
--- a/sciworkerd/sciworkerd.c Thu Jul 21 21:55:02 2022 +0200 +++ b/sciworkerd/sciworkerd.c Mon Jul 25 21:11:23 2022 +0200 @@ -13,7 +13,7 @@ #include "types.h" #include "util.h" -#define TAG "sigworkerd: " +#define TAG "sciworkerd: " struct taskentry { struct task *task; @@ -48,7 +48,7 @@ static void stop(int sign) { - log_info(TAG "exiting on signal %d\n", sign); + log_info(TAG "exiting on signal %d", sign); run = 0; } @@ -69,7 +69,7 @@ { struct taskentry *tk; - log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag); + log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag); tk = util_calloc(1, sizeof (*tk)); tk->task = task_new(job->tag); @@ -82,7 +82,7 @@ { size_t total = 0; - for (ssize_t i = 0; i < jobsz; ++i) { + for (size_t i = 0; i < jobsz; ++i) { if (!pending(jobs[i].id)) { queue(&jobs[i]); total++; @@ -130,9 +130,7 @@ if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) log_die(TAG "unable to fetch worker info: %s", req.error); - log_info("worker name: %s", worker.name); - log_info("worker description: %s", worker.desc); - + log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc); apic_finish(&req); } @@ -191,10 +189,11 @@ size_t running = count(tasks); struct taskentry *entry; - while (running-- > 0 && (entry = taskpending)) { + while (running < sciworkerd.maxjobs && (entry = taskpending)) { if (start(entry) < 0) delete(entry); else { + running++; LL_DELETE(taskpending, entry); LL_APPEND(tasks, entry); } @@ -208,6 +207,7 @@ struct taskcode code; struct pollfd *fds; size_t fdsz, i = 0; + pid_t pid; int ret; /* First, read every pipes. */ @@ -239,6 +239,8 @@ /* Now wait for the task to complete. */ if (ret <= 0) { + pid = task_pid(iter->task); + if (task_wait(iter->task) < 0) log_warn(TAG "task wait error: %s", strerror(errno)); else { @@ -247,11 +249,11 @@ switch (task_status(iter->task)) { case TASKSTATUS_EXITED: log_info(TAG "task %lld exited with code %d", - (long long int)task_pid(iter->task), code.exitcode); + (long long int)pid, code.exitcode); break; case TASKSTATUS_KILLED: log_info(TAG "task %lld killed with signal %d", - (long long int)task_pid(iter->task), code.sigcode); + (long long int)pid, code.sigcode); break; default: break; @@ -259,10 +261,11 @@ } /* Remove that task and push to the outgoing queue. */ - next = iter->next; LL_DELETE(tasks, iter); LL_APPEND(taskfinished, iter); } + + iter = next; } free(fds); @@ -275,7 +278,6 @@ ghost_all(void) { struct taskentry *iter, *tmp; - time_t now; LL_FOREACH_SAFE(tasks, iter, tmp) { if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) @@ -304,20 +306,15 @@ .worker_name = worker.name }; struct apic req; - json_t *doc; - int ret; - - doc = jobresult_to(&res, 1); - ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); - json_decref(doc); + int ret = 0; - if (ret) + if (apic_jobresult_add(&req, &res) < 0) { log_warn(TAG "unable to publish task: %s", req.error); - else - log_info(TAG "task successfully published"); + ret = -1; + } + apic_finish(&req); jobresult_finish(&res); - apic_finish(&req); return ret; } @@ -358,6 +355,7 @@ { while (run) { fetch_jobs(); + start_all(); process_all(); ghost_all(); publish_all();