Mercurial > sci
diff sciworkerd/sciworkerd.c @ 19:de4bf839b565
misc: revamp SQL
author | David Demelier <markand@malikania.fr> |
---|---|
date | Fri, 15 Jul 2022 11:11:48 +0200 |
parents | |
children | f98ea578b1ef |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sciworkerd/sciworkerd.c Fri Jul 15 11:11:48 2022 +0200 @@ -0,0 +1,393 @@ +#include <errno.h> +#include <poll.h> +#include <signal.h> +#include <string.h> +#include <time.h> + +#include <utlist.h> + +#include "apic.h" +#include "log.h" +#include "sciworkerd.h" +#include "task.h" +#include "types.h" +#include "util.h" + +#define TAG "sigworkerd: " + +struct taskentry { + struct task *task; + struct job job; + struct taskentry *next; +}; + +static struct taskentry *taskpending; +static struct taskentry *tasks; +static struct taskentry *taskfinished; +static struct worker worker; +static int run = 1; + +struct sciworkerd sciworkerd = { + .fetchinterval = 300, + .maxjobs = 4, + .timeout = 600 +}; + +static inline void +taskentry_free(struct taskentry *entry) +{ + if (task_status(entry->task) == TASKSTATUS_RUNNING) { + if (task_kill(entry->task) == 0) + task_wait(entry->task); + } + + task_free(entry->task); + free(entry); +} + +static void +stop(int sign) +{ + log_info(TAG "exiting on signal %d\n", sign); + run = 0; +} + +static inline int +pending(int id) +{ + const struct taskentry *iter; + + LL_FOREACH(taskpending, iter) + if (iter->job.id == id) + return 1; + + return 0; +} + +static inline void +queue(const struct job *job) +{ + struct taskentry *tk; + + log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag); + + tk = util_calloc(1, sizeof (*tk)); + tk->task = task_new(job->tag); + memcpy(&tk->job, job, sizeof (*job)); + LL_APPEND(taskpending, tk); +} + +static void +merge(json_t *doc) +{ + struct job jobs[SCI_JOB_LIST_MAX]; + ssize_t jobsz; + size_t total = 0; + + if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) + log_warn(TAG "error while parsing jobs: %s", strerror(errno)); + else { + for (ssize_t i = 0; i < jobsz; ++i) { + if (!pending(jobs[i].id)) { + queue(&jobs[i]); + total++; + } + } + + log_info(TAG "added %zu new pending tasks", total); + } +} + +/* + * Fetch jobs periodically, depending on the user setting. + */ +static void +fetch_jobs(void) +{ + static time_t startup; + time_t now; + struct apicreq req; + + if (!startup) + startup = time(NULL); + + if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { + startup = now; + + if (apic_get(&req, "%s/api/v1/%s", sciworkerd.url, sciworkerd.name) < 0) + log_warn(TAG "unable to fetch jobs: %s", req.error); + if (req.doc) { + merge(req.doc); + json_decref(req.doc); + } + } +} + +/* + * Fetch information about myself. + */ +static void +fetch_worker(void) +{ + struct apicreq req; + + if (apic_get(&req, "%s/api/v1/workers/%s", sciworkerd.url, sciworkerd.name) < 0) + log_warn(TAG "unable to fetch worker info: %s", req.error); + if (!req.doc) + log_die(TAG "empty worker response"); + if (worker_from(&worker, 1, req.doc) < 0) + log_die(TAG "unable to parse worker", strerror(errno)); + + log_info("worker id: %d", worker.id); + log_info("worker name: %s", worker.name); + log_info("worker description: %s", worker.desc); + + json_decref(req.doc); +} + +/* + * Fetch information about a project. + */ +static int +fetch_project(struct project *project, int id) +{ + struct apicreq req; + + if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0) + return log_warn(TAG "unable to fetch project info: %s", req.error), -1; + if (!req.doc) + return log_warn(TAG "empty project response"), -1; + if (project_from(project, 1, req.doc) < 0) + return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1; + + return 0; +} + +static inline size_t +count(const struct taskentry *head) +{ + const struct taskentry *iter; + size_t tot = 0; + + LL_FOREACH(head, iter) + tot++; + + return tot; +} + +/* + * Start a task. We fetch its script code and then create the task with that + * script. + */ +static int +start(struct taskentry *entry) +{ + struct project project; + pid_t pid; + + if (fetch_project(&project, entry->job.project_id) < 0) + return log_warn(TAG "unable to fetch project, dropping task"), -1; + if (task_setup(entry->task, project.script) < 0) + return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1; + if ((pid = task_start(entry->task)) < 0) + return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1; + + log_info(TAG "task %lld spawned", (long long int)pid); + + return 0; +} + +static inline void +delete(struct taskentry *entry) +{ + LL_DELETE(taskpending, entry); + task_free(entry->task); + free(entry); +} + +static void +start_all(void) +{ + size_t running = count(tasks); + struct taskentry *entry; + + while (running-- > 0 && (entry = taskpending)) { + if (start(entry) < 0) + delete(entry); + else { + LL_DELETE(taskpending, entry); + LL_APPEND(tasks, entry); + } + } +} + +static void +process_all(void) +{ + struct taskentry *iter, *next; + struct taskcode code; + struct pollfd *fds; + size_t fdsz, i = 0; + int ret; + + /* First, read every pipes. */ + if (!(fdsz = count(tasks))) + return; + + fds = util_calloc(fdsz, sizeof (*fds)); + + for (iter = tasks; iter; iter = iter->next) + task_prepare(iter->task, &fds[i++]); + + if (poll(fds, fdsz, 5000) < 0) + log_warn("poll: %s", strerror(errno)); + + for (iter = tasks, i = 0; i < fdsz; ++i) { + next = iter->next; + + /* + * 0: EOF [wait] + * -1: error [kill + wait] + * >0: keep going [nothing] + */ + if ((ret = task_sync(iter->task, &fds[i])) < 0) { + log_warn(TAG "pipe error: %s, killing task", strerror(errno)); + + if (task_kill(iter->task) < 0) + log_warn(TAG "task kill error: %s", strerror(errno)); + } + + /* Now wait for the task to complete. */ + if (ret <= 0) { + if (task_wait(iter->task) < 0) + log_warn(TAG "task wait error: %s", strerror(errno)); + else { + code = task_code(iter->task); + + switch (task_status(iter->task)) { + case TASKSTATUS_EXITED: + log_info(TAG "task %lld exited with code %d", + (long long int)task_pid(iter->task), code.exitcode); + break; + case TASKSTATUS_KILLED: + log_info(TAG "task %lld killed with signal %d", + (long long int)task_pid(iter->task), code.sigcode); + break; + default: + break; + } + } + + /* Remove that task and push to the outgoing queue. */ + next = iter->next; + LL_DELETE(tasks, iter); + LL_APPEND(taskfinished, iter); + } + } + + free(fds); +} + +/* + * Kill all tasks that have been running for too long. + */ +static void +ghost_all(void) +{ + struct taskentry *iter, *tmp; + time_t now; + + LL_FOREACH_SAFE(tasks, iter, tmp) { + if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) + continue; + + /* Do not attempt to wait if kill failed to avoid lock. */ + log_info(TAG "task timeout, killing"); + + if (task_kill(iter->task) == 0) + task_wait(iter->task); + + LL_DELETE(tasks, iter); + LL_APPEND(taskfinished, iter); + } +} + +static int +publish(struct taskentry *iter) +{ + // TODO: add sigcode. + struct taskcode code = task_code(iter->task); + struct jobresult res = { + .job_id = iter->job.id, + .exitcode = code.exitcode, + .log = task_console(iter->task), + .worker_id = worker.id + }; + struct apicreq req; + json_t *doc; + int ret; + + doc = jobresult_to(&res, 1); + ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); + json_decref(doc); + + if (ret) + log_warn(TAG "unable to publish task: %s", req.error); + else + log_info(TAG "task successfully published"); + + return ret; +} + +static void +publish_all(void) +{ + struct taskentry *iter, *tmp; + + LL_FOREACH_SAFE(taskfinished, iter, tmp) { + if (publish(iter) == 0) { + LL_DELETE(taskfinished, iter); + taskentry_free(iter); + } + } +} + +void +sciworkerd_init(void) +{ + struct sigaction sa = {0}; + + log_open("sigworkerd"); + + sigemptyset(&sa.sa_mask); + sa.sa_handler = stop; + + if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) + log_die(TAG "sigaction: %s", strerror(errno)); + + fetch_worker(); +} + +void +sciworkerd_run(void) +{ + while (run) { + fetch_jobs(); + process_all(); + ghost_all(); + publish_all(); + } +} + +void +sciworkerd_finish(void) +{ + struct taskentry *iter, *tmp; + + LL_FOREACH_SAFE(taskpending, iter, tmp) + taskentry_free(iter); + LL_FOREACH_SAFE(tasks, iter, tmp) + taskentry_free(iter); + LL_FOREACH_SAFE(taskfinished, iter, tmp) + taskentry_free(iter); +}