Mercurial > sci
view sciworkerd/sciworkerd.c @ 19:de4bf839b565
misc: revamp SQL
author | David Demelier <markand@malikania.fr> |
---|---|
date | Fri, 15 Jul 2022 11:11:48 +0200 |
parents | |
children | f98ea578b1ef |
line wrap: on
line source
#include <errno.h> #include <poll.h> #include <signal.h> #include <string.h> #include <time.h> #include <utlist.h> #include "apic.h" #include "log.h" #include "sciworkerd.h" #include "task.h" #include "types.h" #include "util.h" #define TAG "sigworkerd: " struct taskentry { struct task *task; struct job job; struct taskentry *next; }; static struct taskentry *taskpending; static struct taskentry *tasks; static struct taskentry *taskfinished; static struct worker worker; static int run = 1; struct sciworkerd sciworkerd = { .fetchinterval = 300, .maxjobs = 4, .timeout = 600 }; static inline void taskentry_free(struct taskentry *entry) { if (task_status(entry->task) == TASKSTATUS_RUNNING) { if (task_kill(entry->task) == 0) task_wait(entry->task); } task_free(entry->task); free(entry); } static void stop(int sign) { log_info(TAG "exiting on signal %d\n", sign); run = 0; } static inline int pending(int id) { const struct taskentry *iter; LL_FOREACH(taskpending, iter) if (iter->job.id == id) return 1; return 0; } static inline void queue(const struct job *job) { struct taskentry *tk; log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag); tk = util_calloc(1, sizeof (*tk)); tk->task = task_new(job->tag); memcpy(&tk->job, job, sizeof (*job)); LL_APPEND(taskpending, tk); } static void merge(json_t *doc) { struct job jobs[SCI_JOB_LIST_MAX]; ssize_t jobsz; size_t total = 0; if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) log_warn(TAG "error while parsing jobs: %s", strerror(errno)); else { for (ssize_t i = 0; i < jobsz; ++i) { if (!pending(jobs[i].id)) { queue(&jobs[i]); total++; } } log_info(TAG "added %zu new pending tasks", total); } } /* * Fetch jobs periodically, depending on the user setting. */ static void fetch_jobs(void) { static time_t startup; time_t now; struct apicreq req; if (!startup) startup = time(NULL); if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { startup = now; if (apic_get(&req, "%s/api/v1/%s", sciworkerd.url, sciworkerd.name) < 0) log_warn(TAG "unable to fetch jobs: %s", req.error); if (req.doc) { merge(req.doc); json_decref(req.doc); } } } /* * Fetch information about myself. */ static void fetch_worker(void) { struct apicreq req; if (apic_get(&req, "%s/api/v1/workers/%s", sciworkerd.url, sciworkerd.name) < 0) log_warn(TAG "unable to fetch worker info: %s", req.error); if (!req.doc) log_die(TAG "empty worker response"); if (worker_from(&worker, 1, req.doc) < 0) log_die(TAG "unable to parse worker", strerror(errno)); log_info("worker id: %d", worker.id); log_info("worker name: %s", worker.name); log_info("worker description: %s", worker.desc); json_decref(req.doc); } /* * Fetch information about a project. */ static int fetch_project(struct project *project, int id) { struct apicreq req; if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0) return log_warn(TAG "unable to fetch project info: %s", req.error), -1; if (!req.doc) return log_warn(TAG "empty project response"), -1; if (project_from(project, 1, req.doc) < 0) return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1; return 0; } static inline size_t count(const struct taskentry *head) { const struct taskentry *iter; size_t tot = 0; LL_FOREACH(head, iter) tot++; return tot; } /* * Start a task. We fetch its script code and then create the task with that * script. */ static int start(struct taskentry *entry) { struct project project; pid_t pid; if (fetch_project(&project, entry->job.project_id) < 0) return log_warn(TAG "unable to fetch project, dropping task"), -1; if (task_setup(entry->task, project.script) < 0) return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1; if ((pid = task_start(entry->task)) < 0) return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1; log_info(TAG "task %lld spawned", (long long int)pid); return 0; } static inline void delete(struct taskentry *entry) { LL_DELETE(taskpending, entry); task_free(entry->task); free(entry); } static void start_all(void) { size_t running = count(tasks); struct taskentry *entry; while (running-- > 0 && (entry = taskpending)) { if (start(entry) < 0) delete(entry); else { LL_DELETE(taskpending, entry); LL_APPEND(tasks, entry); } } } static void process_all(void) { struct taskentry *iter, *next; struct taskcode code; struct pollfd *fds; size_t fdsz, i = 0; int ret; /* First, read every pipes. */ if (!(fdsz = count(tasks))) return; fds = util_calloc(fdsz, sizeof (*fds)); for (iter = tasks; iter; iter = iter->next) task_prepare(iter->task, &fds[i++]); if (poll(fds, fdsz, 5000) < 0) log_warn("poll: %s", strerror(errno)); for (iter = tasks, i = 0; i < fdsz; ++i) { next = iter->next; /* * 0: EOF [wait] * -1: error [kill + wait] * >0: keep going [nothing] */ if ((ret = task_sync(iter->task, &fds[i])) < 0) { log_warn(TAG "pipe error: %s, killing task", strerror(errno)); if (task_kill(iter->task) < 0) log_warn(TAG "task kill error: %s", strerror(errno)); } /* Now wait for the task to complete. */ if (ret <= 0) { if (task_wait(iter->task) < 0) log_warn(TAG "task wait error: %s", strerror(errno)); else { code = task_code(iter->task); switch (task_status(iter->task)) { case TASKSTATUS_EXITED: log_info(TAG "task %lld exited with code %d", (long long int)task_pid(iter->task), code.exitcode); break; case TASKSTATUS_KILLED: log_info(TAG "task %lld killed with signal %d", (long long int)task_pid(iter->task), code.sigcode); break; default: break; } } /* Remove that task and push to the outgoing queue. */ next = iter->next; LL_DELETE(tasks, iter); LL_APPEND(taskfinished, iter); } } free(fds); } /* * Kill all tasks that have been running for too long. */ static void ghost_all(void) { struct taskentry *iter, *tmp; time_t now; LL_FOREACH_SAFE(tasks, iter, tmp) { if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) continue; /* Do not attempt to wait if kill failed to avoid lock. */ log_info(TAG "task timeout, killing"); if (task_kill(iter->task) == 0) task_wait(iter->task); LL_DELETE(tasks, iter); LL_APPEND(taskfinished, iter); } } static int publish(struct taskentry *iter) { // TODO: add sigcode. struct taskcode code = task_code(iter->task); struct jobresult res = { .job_id = iter->job.id, .exitcode = code.exitcode, .log = task_console(iter->task), .worker_id = worker.id }; struct apicreq req; json_t *doc; int ret; doc = jobresult_to(&res, 1); ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); json_decref(doc); if (ret) log_warn(TAG "unable to publish task: %s", req.error); else log_info(TAG "task successfully published"); return ret; } static void publish_all(void) { struct taskentry *iter, *tmp; LL_FOREACH_SAFE(taskfinished, iter, tmp) { if (publish(iter) == 0) { LL_DELETE(taskfinished, iter); taskentry_free(iter); } } } void sciworkerd_init(void) { struct sigaction sa = {0}; log_open("sigworkerd"); sigemptyset(&sa.sa_mask); sa.sa_handler = stop; if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) log_die(TAG "sigaction: %s", strerror(errno)); fetch_worker(); } void sciworkerd_run(void) { while (run) { fetch_jobs(); process_all(); ghost_all(); publish_all(); } } void sciworkerd_finish(void) { struct taskentry *iter, *tmp; LL_FOREACH_SAFE(taskpending, iter, tmp) taskentry_free(iter); LL_FOREACH_SAFE(tasks, iter, tmp) taskentry_free(iter); LL_FOREACH_SAFE(taskfinished, iter, tmp) taskentry_free(iter); }