Mercurial > sci
view sciworkerd/sciworkerd.c @ 34:e52c762d8ba8
misc: cleanups
author | David Demelier <markand@malikania.fr> |
---|---|
date | Thu, 04 Aug 2022 17:47:19 +0200 |
parents | dae2de19ca5d |
children | 4076b07c7a6f |
line wrap: on
line source
/* * sciworkerd.c -- main sciworkerd file * * Copyright (c) 2021-2022 David Demelier <markand@malikania.fr> * * Permission to use, copy, modify, and/or distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #include <errno.h> #include <poll.h> #include <signal.h> #include <stdint.h> #include <string.h> #include <time.h> #include <utlist.h> #include "apic.h" #include "log.h" #include "sciworkerd.h" #include "task.h" #include "util.h" #define TAG "sciworkerd: " struct taskentry { intmax_t job_id; char *tag; char *project_name; struct task *task; struct taskentry *next; }; static struct taskentry *taskpending; static struct taskentry *tasks; static struct taskentry *taskfinished; static int run = 1; struct sciworkerd sciworkerd = { .fetchinterval = 300, .maxjobs = 4, .timeout = 600 }; static inline void taskentry_free(struct taskentry *entry) { if (task_status(entry->task) == TASKSTATUS_RUNNING) { if (task_kill(entry->task) == 0) task_wait(entry->task); } task_free(entry->task); free(entry); } static void stop(int sign) { log_info(TAG "exiting on signal %d", sign); run = 0; } static inline int pending(intmax_t job_id) { const struct taskentry *iter; LL_FOREACH(taskpending, iter) if (iter->job_id == job_id) return 1; return 0; } static inline void queue(intmax_t id, const char *tag, const char *project_name) { struct taskentry *tk; log_info(TAG "queued job build (%d) for tag %s", id, tag); tk = util_calloc(1, sizeof (*tk)); tk->job_id = id; tk->tag = util_strdup(tag); tk->project_name = util_strdup(project_name); tk->task = task_new(tag); LL_APPEND(taskpending, tk); } static void merge(json_t *jobs) { json_int_t id; json_t *val; const char *tag, *project_name; size_t total = 0, i; int parse; json_array_foreach(jobs, i, val) { parse = json_unpack(val, "{sI ss ss}", "id", &id, "tag", &tag, "project_name", &project_name ); if (parse < 0) { log_warn(TAG "unable to parse job"); continue; } if (!pending(id)) { queue(id, tag, project_name); total++; } } log_info(TAG "added %zu new pending tasks", total); } /* * Fetch jobs periodically, depending on the user setting. */ static void fetch_jobs(void) { static time_t startup; time_t now; struct apic req; json_t *jobs; if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { startup = now; log_info(TAG "fetching jobs"); if (!(jobs = apic_job_todo(&req, sciworkerd.name))) log_warn(TAG "unable to fetch jobs: %s", req.error); else { merge(jobs); json_decref(jobs); } } } static inline size_t count(const struct taskentry *head) { const struct taskentry *iter; size_t tot = 0; LL_FOREACH(head, iter) tot++; return tot; } /* * Start a task. We fetch its script code and then create the task with that * script. */ static int start(struct taskentry *entry) { struct apic req; json_t *doc; const char *script; pid_t pid; int ret = -1; if (!(doc = apic_project_find(&req, entry->project_name))) return log_warn(TAG "unable to fetch project, dropping task"), -1; if (json_unpack(doc, "{ss}", "script", &script) < 0) { json_decref(doc); return log_warn(TAG "invalid project JSON object"), -1; } if (task_setup(entry->task, script) < 0) log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); else if ((pid = task_start(entry->task)) < 0) log_warn(TAG "unable to spawn task process: %s", strerror(errno)); else { log_info(TAG "task %lld spawned", (long long int)pid); ret = 0; } json_decref(doc); return ret; } static inline void delete(struct taskentry *entry) { LL_DELETE(taskpending, entry); task_free(entry->task); free(entry); } static void start_all(void) { size_t running = count(tasks); struct taskentry *entry; while (running < sciworkerd.maxjobs && (entry = taskpending)) { if (start(entry) < 0) delete(entry); else { running++; LL_DELETE(taskpending, entry); LL_APPEND(tasks, entry); } } } static void process_all(void) { struct taskentry *iter, *next; struct taskcode code; struct pollfd *fds; size_t fdsz, i = 0; pid_t pid; int ret; /* First, read every pipes. */ if (!(fdsz = count(tasks))) return; fds = util_calloc(fdsz, sizeof (*fds)); for (iter = tasks; iter; iter = iter->next) task_prepare(iter->task, &fds[i++]); if (poll(fds, fdsz, 5000) < 0) log_warn("poll: %s", strerror(errno)); for (iter = tasks, i = 0; i < fdsz; ++i) { next = iter->next; /* * 0: EOF [wait] * -1: error [kill + wait] * >0: keep going [nothing] */ if ((ret = task_sync(iter->task, &fds[i])) < 0) { log_warn(TAG "pipe error: %s, killing task", strerror(errno)); if (task_kill(iter->task) < 0) log_warn(TAG "task kill error: %s", strerror(errno)); } /* Now wait for the task to complete. */ if (ret <= 0) { pid = task_pid(iter->task); if (task_wait(iter->task) < 0) log_warn(TAG "task wait error: %s", strerror(errno)); else { code = task_code(iter->task); switch (task_status(iter->task)) { case TASKSTATUS_EXITED: log_info(TAG "task %lld exited with code %d", (long long int)pid, code.exitcode); break; case TASKSTATUS_KILLED: log_info(TAG "task %lld killed with signal %d", (long long int)pid, code.sigcode); break; default: break; } } /* Remove that task and push to the outgoing queue. */ LL_DELETE(tasks, iter); LL_APPEND(taskfinished, iter); } iter = next; } free(fds); } /* * Kill all tasks that have been running for too long. */ static void ghost_all(void) { struct taskentry *iter, *tmp; LL_FOREACH_SAFE(tasks, iter, tmp) { if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) continue; /* Do not attempt to wait if kill failed to avoid lock. */ log_info(TAG "task timeout, killing"); if (task_kill(iter->task) == 0) task_wait(iter->task); LL_DELETE(tasks, iter); LL_APPEND(taskfinished, iter); } } static int publish(struct taskentry *iter) { struct apic req; json_t *obj; int ret = 0; obj = json_pack("{sI ss ss si si}", "job_id", iter->job_id, "worker_name", sciworkerd.name, "console", task_console(iter->task), "exitcode", task_code(iter->task).exitcode, "sigcode", task_code(iter->task).sigcode ); if (apic_jobresult_add(&req, obj) < 0) { log_warn(TAG "unable to publish task: %s", req.error); ret = -1; } json_decref(obj); return ret; } static void publish_all(void) { struct taskentry *iter, *tmp; LL_FOREACH_SAFE(taskfinished, iter, tmp) { if (publish(iter) == 0) { LL_DELETE(taskfinished, iter); taskentry_free(iter); } } } void sciworkerd_init(void) { struct sigaction sa = {0}; log_open("sigworkerd"); if (strlen(sciworkerd.name) == 0) log_die(TAG "no worker name defined"); sigemptyset(&sa.sa_mask); sa.sa_handler = stop; util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) log_die(TAG "sigaction: %s", strerror(errno)); } void sciworkerd_run(void) { while (run) { fetch_jobs(); start_all(); process_all(); ghost_all(); publish_all(); } } void sciworkerd_finish(void) { struct taskentry *iter, *tmp; LL_FOREACH_SAFE(taskpending, iter, tmp) taskentry_free(iter); LL_FOREACH_SAFE(tasks, iter, tmp) { task_kill(iter->task); taskentry_free(iter); } LL_FOREACH_SAFE(taskfinished, iter, tmp) taskentry_free(iter); }