Mercurial > sci
view sciworkerd/main.c @ 18:600204c31bf0
misc: refactor
author | David Demelier <markand@malikania.fr> |
---|---|
date | Tue, 12 Jul 2022 20:20:51 +0200 |
parents | sciworkerd.c@40fe70256fb0 |
children | de4bf839b565 |
line wrap: on
line source
/* * sciworkerd.c -- main sciworkerd(8) program file * * Copyright (c) 2021 David Demelier <markand@malikania.fr> * * Permission to use, copy, modify, and/or distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #if 0 #include <sys/queue.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> #include <assert.h> #include <err.h> #include <errno.h> #include <fcntl.h> #include <limits.h> #include <poll.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <stdnoreturn.h> #include <string.h> #include <unistd.h> #include <curl/curl.h> #include <jansson.h> #include "config.h" #include "log.h" #include "types.h" #include "util.h" #define TAG_MAX 256 struct task { enum taskst status; pid_t child; int pipe[2]; int exitcode; int job_id; int project_id; char job_tag[TAG_MAX]; char out[SCI_CONSOLE_MAX]; char script[PATH_MAX]; int scriptfd; TAILQ_ENTRY(task) link; }; TAILQ_HEAD(tasks, task); struct fds { struct pollfd *list; size_t listsz; }; struct fetch { char buf[SCI_MSG_MAX]; FILE *bufp; }; static struct { char *url; char *worker; int maxbuilds; } config = { .url = "http://localhost", .worker = "default", .maxbuilds = 4 }; static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); static struct worker worker; static int alive = 1; /* * Show usage and exit with code 1. */ noreturn static void usage(void) { fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname()); exit(1); } /* * Find a task by its id. */ static inline struct task * find_by_fd(int fd) { struct task *tk; TAILQ_FOREACH(tk, &tasks, link) if (tk->pipe[0] == fd) return tk; return NULL; } /* * Find a task by its pid number. */ static inline struct task * find_by_pid(pid_t pid) { struct task *t; TAILQ_FOREACH(t, &tasks, link) if (t->child == pid) return t; return NULL; } /* * Destroy a task entirely. */ static void destroy(struct task *tk) { log_debug("destroying task %d", tk->job_id); unlink(tk->script); if (tk->pipe[0]) close(tk->pipe[0]); if (tk->pipe[1]) close(tk->pipe[1]); if (tk->scriptfd) close(tk->scriptfd); TAILQ_REMOVE(&tasks, tk, link); memset(tk, 0, sizeof (*tk)); free(tk); } static const char * makeurl(const char *fmt, ...) { assert(fmt); static char url[256]; char page[128] = {0}; va_list ap; va_start(ap, fmt); vsnprintf(page, sizeof (page), fmt, ap); va_end(ap); snprintf(url, sizeof (url), "%s/%s", config.url, page); return url; } static void complete(int signum, siginfo_t *sinfo, void *ctx) { (void)ctx; (void)signum; struct task *tk; if (waitpid(sinfo->si_pid, NULL, 0) < 0) log_warn("waitpid: %s", strerror(errno)); if ((tk = find_by_pid(sinfo->si_pid))) { log_debug("process %d terminated (exitcode=%d)", (int)sinfo->si_pid, sinfo->si_status); close(tk->pipe[1]); tk->status = TASKST_COMPLETED; tk->exitcode = sinfo->si_status; tk->pipe[1] = 0; } } static void stop(int signum) { log_warn("exiting on signal %d", signum); alive = 0; } static char * uploadenc(const struct task *tk) { json_t *doc; struct jobresult res = {0}; char *dump; res.job_id = tk->job_id; res.exitcode = tk->exitcode; res.log = tk->out; res.worker_id = worker.id; doc = jobresult_to(&res, 1); dump = json_dumps(doc, JSON_COMPACT); json_decref(doc); return dump; } static size_t getcb(char *in, size_t n, size_t w, FILE *fp) { if (fwrite(in, n, w, fp) != w) return log_warn("get: %s", strerror(errno)), 0; return w; } static json_t * get(const char *topic, const char *url) { CURL *curl; CURLcode code; json_t *doc; json_error_t error; char buf[SCI_MSG_MAX]; long status; FILE *fp; curl = curl_easy_init(); if (!(fp = fmemopen(buf, sizeof (buf), "w"))) err(1, "fmemopen"); curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb); curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp); if ((code = curl_easy_perform(curl)) != CURLE_OK) log_warn("%s: %s", topic, curl_easy_strerror(code)); curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); curl_easy_cleanup(curl); fclose(fp); if (code != CURLE_OK) return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL; if (status != 200) return log_warn("%s: unexpected status code %ld", topic, status), NULL; if (!(doc = json_loads(buf, 0, &error))) return log_warn("%s: %s", topic, error.text), NULL; return doc; } static size_t silent(char *in, size_t n, size_t w, void *data) { (void)in; (void)n; (void)data; return w; } static void upload(struct task *tk) { CURL *curl; CURLcode code; struct curl_slist *headers = NULL; long status; char *dump; curl = curl_easy_init(); headers = curl_slist_append(headers, "Content-Type: application/json"); curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs")); //curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000"); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk))); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump)); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); code = curl_easy_perform(curl); curl_slist_free_all(headers); /* * If we fail to upload data, we put the result into syncing mode so * that we retry later without redoing the job over and over */ tk->status = TASKST_SYNCING; if (code != CURLE_OK) log_warn("upload: %s", curl_easy_strerror(code)); else { curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); if (status != 200) log_warn("upload: unexpected return code: %ld", status); else destroy(tk); } free(dump); curl_easy_cleanup(curl); } static inline int pending(int id) { struct task *t; TAILQ_FOREACH(t, &tasks, link) if (t->job_id == id) return 1; return 0; } static void queue(int id, int project_id, const char *tag) { struct task *tk; log_info("queued job build (%d) for tag %s\n", id, tag); tk = util_calloc(1, sizeof (*tk)); tk->job_id = id; tk->project_id = project_id; strlcpy(tk->job_tag, tag, sizeof (tk->job_tag)); TAILQ_INSERT_TAIL(&tasks, tk, link); } static void merge(json_t *doc) { struct job jobs[SCI_JOB_LIST_MAX]; ssize_t jobsz; if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) log_warn("fetchjobs: %s", strerror(errno)); else { for (ssize_t i = 0; i < jobsz; ++i) { if (!pending(jobs[i].id)) queue(jobs[i].id, jobs[i].project_id, jobs[i].tag); } } json_decref(doc); } static void fetchjobs(void) { json_t *doc; if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker)))) log_warn("unable to retrieve jobs"); else merge(doc); } /* * This function reads stdout/stderr pipe from child and optionally remove them * if they have completed. */ static void readall(struct fds *fds) { struct task *tk; char buf[BUFSIZ]; ssize_t nr; for (size_t i = 0; i < fds->listsz; ++i) { if (fds->list[i].revents == 0) continue; if (!(tk = find_by_fd(fds->list[i].fd))) continue; /* Read stdout/stderr from children pipe. */ if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0) tk->status = TASKST_SYNCING; else { buf[nr] = 0; strlcat(tk->out, buf, sizeof (tk->out)); } } } /* * Retrieve status code from spawned process complete or upload again if they * failed to sync. */ static void flushall(void) { struct task *tk, *tmp; TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) if (tk->status == TASKST_SYNCING) upload(tk); } static int extract(struct task *tk, json_t *doc) { struct project proj; size_t len; if (project_from(&proj, 1, doc) < 0) { json_decref(doc); log_warn("fetchproject: %s", strerror(errno)); return -1; } len = strlen(proj.script); if ((size_t)write(tk->scriptfd, proj.script, len) != len) { json_decref(doc); log_warn("fetchproject: %s", strerror(errno)); return -1; } /* Close so we can finally spawn it. */ close(tk->scriptfd); tk->scriptfd = 0; return 0; } static int fetchproject(struct task *tk) { json_t *doc; if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id)))) return -1; return extract(tk, doc); } /* * Create a task to run the script. This will retrieve the project script code * at this moment and put it in a temporary file. */ static void createtask(struct task *tk) { if (tk->status != TASKST_PENDING) return; log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag); snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id); if ((tk->scriptfd = mkstemp(tk->script)) < 0 || fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) { unlink(tk->script); log_warn("%s", strerror(errno)); return; } if (fetchproject(tk) < 0) { unlink(tk->script); close(tk->scriptfd); tk->scriptfd = 0; } else spawn(tk); } /* * Start all pending tasks if the limit of running tasks is not reached. */ static void startall(void) { size_t nrunning = 0; struct task *tk; TAILQ_FOREACH(tk, &tasks, link) if (tk->status == TASKST_RUNNING) ++nrunning; if (nrunning >= (size_t)config.maxbuilds) log_debug("not spawning new process because limit is reached"); else { tk = TAILQ_FIRST(&tasks); while (tk && nrunning++ < (size_t)config.maxbuilds) { createtask(tk); tk = TAILQ_NEXT(tk, link); } } } static void fetchworker(void) { json_t *doc; if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) || worker_from(&worker, 1, doc) < 0) errx(1, "unable to retrieve worker id"); log_info("worker id: %d", worker.id); log_info("worker name: %s", worker.name); log_info("worker description: %s", worker.desc); json_decref(doc); } static void init(void) { struct sigaction sa; sa.sa_flags = SA_SIGINFO | SA_RESTART; sa.sa_sigaction = complete; sigemptyset(&sa.sa_mask); if (sigaction(SIGCHLD, &sa, NULL) < 0) err(1, "sigaction"); sa.sa_flags = SA_RESTART; sa.sa_handler = stop; sigemptyset(&sa.sa_mask); if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0) err(1, "sigaction"); log_open("sciworkerd"); fetchworker(); } static struct fds prepare(void) { struct fds fds = {0}; struct task *tk; size_t i = 0; TAILQ_FOREACH(tk, &tasks, link) if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) fds.listsz++; fds.list = util_calloc(fds.listsz, sizeof (*fds.list)); TAILQ_FOREACH(tk, &tasks, link) { if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { fds.list[i].fd = tk->pipe[0]; fds.list[i++].events = POLLIN | POLLPRI; } } return fds; } static void run(void) { struct fds fds; fds = prepare(); if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR) err(1, "poll"); fetchjobs(); readall(&fds); startall(); flushall(); } static void finish(void) { size_t tot = 0; struct task *tk, *tmp; TAILQ_FOREACH(tk, &tasks, link) tot++; signal(SIGCHLD, SIG_IGN); log_debug("killing remaining %zu tasks", tot); TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) { if (tk->status == TASKST_RUNNING) { kill(tk->child, SIGTERM); waitpid(tk->child, NULL, 0); } destroy(tk); } } int main(int argc, char **argv) { int ch; const char *errstr; setprogname("sciworkerd"); while ((ch = getopt(argc, argv, "m:u:w:")) != -1) { switch (ch) { case 'm': config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr); if (errstr) errx(1, "%s: %s", optarg, errstr); break; case 'u': config.url = optarg; break; case 'w': config.worker = optarg; break; default: usage(); break; } } init(); while (alive) run(); finish(); } #endif #include <err.h> #include <errno.h> #include <poll.h> #include <signal.h> #include <string.h> #include <time.h> #include <unistd.h> #include "types.h" #include "task.h" #define SCRIPT \ "#!/bin/sh\n" \ "echo yes\n" \ "sleep 10\n" \ "echo no 1>&2\n" \ "sleep 1\n" \ "exit 1" int main(void) { struct job job = { .project_id = 10, .id = 10, .tag = "1234" }; struct sigaction sa = {0}; struct pollfd fd; struct task *t; int run = 1; t = task_new(&job); if (task_setup(t, SCRIPT) < 0) err(1, "task_set_script"); if (task_start(t) < 0) err(1, "task_start"); while (run) { if (difftime(time(NULL), task_uptime(t)) >= 3) { printf("task timeout !\n"); task_kill(t); task_wait(t); break; } task_prepare(t, &fd); if (poll(&fd, 1, 250) < 0 && errno != EINTR) err(1, "poll"); switch (task_sync(t, &fd)) { case -1: err(1, "task_sync"); case 0: run = 0; task_wait(t); break; default: /* Keep going... */ break; } } switch (task_status(t)) { case TASKSTATUS_EXITED: printf("process exited with code: %d\n", task_code(t).exitcode); break; case TASKSTATUS_KILLED: printf("process killed with signal %d\n", task_code(t).sigcode); break; default: break; } printf("== console ==\n%s==\n", task_console(t)); task_free(t); }