Mercurial > sci
diff sciworkerd/sciworkerd.c @ 27:dae2de19ca5d
misc: switch to JSON everywhere
author | David Demelier <markand@malikania.fr> |
---|---|
date | Wed, 03 Aug 2022 15:18:09 +0200 |
parents | 34cbbd215ef7 |
children | e52c762d8ba8 |
line wrap: on
line diff
--- a/sciworkerd/sciworkerd.c Tue Aug 02 13:24:13 2022 +0200 +++ b/sciworkerd/sciworkerd.c Wed Aug 03 15:18:09 2022 +0200 @@ -1,6 +1,7 @@ #include <errno.h> #include <poll.h> #include <signal.h> +#include <stdint.h> #include <string.h> #include <time.h> @@ -10,21 +11,21 @@ #include "log.h" #include "sciworkerd.h" #include "task.h" -#include "types.h" #include "util.h" #define TAG "sciworkerd: " struct taskentry { + intmax_t job_id; + char *tag; + char *project_name; struct task *task; - struct job job; struct taskentry *next; }; static struct taskentry *taskpending; static struct taskentry *tasks; static struct taskentry *taskfinished; -static struct worker worker; static int run = 1; struct sciworkerd sciworkerd = { @@ -53,41 +54,57 @@ } static inline int -pending(int id) +pending(intmax_t job_id) { const struct taskentry *iter; LL_FOREACH(taskpending, iter) - if (iter->job.id == id) + if (iter->job_id == job_id) return 1; return 0; } static inline void -queue(const struct job *job) +queue(intmax_t id, const char *tag, const char *project_name) { struct taskentry *tk; - log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag); + log_info(TAG "queued job build (%d) for tag %s", id, tag); tk = util_calloc(1, sizeof (*tk)); - tk->task = task_new(job->tag); - memcpy(&tk->job, job, sizeof (*job)); + tk->job_id = id; + tk->tag = util_strdup(tag); + tk->project_name = util_strdup(project_name); + tk->task = task_new(tag); LL_APPEND(taskpending, tk); } static void -merge(struct job *jobs, size_t jobsz) +merge(json_t *jobs) { - size_t total = 0; + json_int_t id; + json_t *val; + const char *tag, *project_name; + size_t total = 0, i; + int parse; - for (size_t i = 0; i < jobsz; ++i) { - if (!pending(jobs[i].id)) { - queue(&jobs[i]); + json_array_foreach(jobs, i, val) { + parse = json_unpack(val, "{sI ss ss}", + "id", &id, + "tag", &tag, + "project_name", &project_name + ); + + if (parse < 0) { + log_warn(TAG "unable to parse job"); + continue; + } + + if (!pending(id)) { + queue(id, tag, project_name); total++; - } else - job_finish(&jobs[i]); + } } log_info(TAG "added %zu new pending tasks", total); @@ -102,38 +119,21 @@ static time_t startup; time_t now; struct apic req; - struct job todo[SCI_JOB_LIST_MAX]; - ssize_t todosz; + json_t *jobs; if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { startup = now; - log_info(TAG "fetching jobs"); - if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0) + if (!(jobs = apic_job_todo(&req, sciworkerd.name))) log_warn(TAG "unable to fetch jobs: %s", req.error); - else - merge(todo, todosz); - - apic_finish(&req); + else { + merge(jobs); + json_decref(jobs); + } } } -/* - * Fetch information about myself. - */ -static void -fetch_worker(void) -{ - struct apic req; - - if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) - log_die(TAG "unable to fetch worker info: %s", req.error); - - log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc); - apic_finish(&req); -} - static inline size_t count(const struct taskentry *head) { @@ -154,14 +154,19 @@ start(struct taskentry *entry) { struct apic req; - struct project project; + json_t *doc; + const char *script; pid_t pid; int ret = -1; - if (apic_project_find(&req, &project, entry->job.project_name) < 0) + if (!(doc = apic_project_find(&req, entry->project_name))) return log_warn(TAG "unable to fetch project, dropping task"), -1; + if (json_unpack(doc, "{ss}", "script", &script) < 0) { + json_decref(doc); + return log_warn(TAG "invalid project JSON object"), -1; + } - if (task_setup(entry->task, project.script) < 0) + if (task_setup(entry->task, script) < 0) log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); else if ((pid = task_start(entry->task)) < 0) log_warn(TAG "unable to spawn task process: %s", strerror(errno)); @@ -170,7 +175,7 @@ ret = 0; } - project_finish(&project); + json_decref(doc); return ret; } @@ -297,24 +302,24 @@ static int publish(struct taskentry *iter) { - // TODO: add sigcode. - struct taskcode code = task_code(iter->task); - struct jobresult res = { - .job_id = iter->job.id, - .exitcode = code.exitcode, - .log = util_strdup(task_console(iter->task)), - .worker_name = worker.name - }; struct apic req; + json_t *obj; int ret = 0; - if (apic_jobresult_add(&req, &res) < 0) { + obj = json_pack("{sI ss ss si si}", + "job_id", iter->job_id, + "worker_name", sciworkerd.name, + "console", task_console(iter->task), + "exitcode", task_code(iter->task).exitcode, + "sigcode", task_code(iter->task).sigcode + ); + + if (apic_jobresult_add(&req, obj) < 0) { log_warn(TAG "unable to publish task: %s", req.error); ret = -1; } - apic_finish(&req); - jobresult_finish(&res); + json_decref(obj); return ret; } @@ -339,6 +344,9 @@ log_open("sigworkerd"); + if (strlen(sciworkerd.name) == 0) + log_die(TAG "no worker name defined"); + sigemptyset(&sa.sa_mask); sa.sa_handler = stop; @@ -346,8 +354,6 @@ if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) log_die(TAG "sigaction: %s", strerror(errno)); - - fetch_worker(); } void