Mercurial > sci
diff sciworkerd/main.c @ 18:600204c31bf0
misc: refactor
author | David Demelier <markand@malikania.fr> |
---|---|
date | Tue, 12 Jul 2022 20:20:51 +0200 |
parents | sciworkerd.c@40fe70256fb0 |
children | de4bf839b565 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sciworkerd/main.c Tue Jul 12 20:20:51 2022 +0200 @@ -0,0 +1,734 @@ +/* + * sciworkerd.c -- main sciworkerd(8) program file + * + * Copyright (c) 2021 David Demelier <markand@malikania.fr> + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#if 0 + +#include <sys/queue.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <assert.h> +#include <err.h> +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <poll.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdnoreturn.h> +#include <string.h> +#include <unistd.h> + +#include <curl/curl.h> +#include <jansson.h> + +#include "config.h" +#include "log.h" +#include "types.h" +#include "util.h" + +#define TAG_MAX 256 + +struct task { + enum taskst status; + pid_t child; + int pipe[2]; + int exitcode; + int job_id; + int project_id; + char job_tag[TAG_MAX]; + char out[SCI_CONSOLE_MAX]; + char script[PATH_MAX]; + int scriptfd; + TAILQ_ENTRY(task) link; +}; + +TAILQ_HEAD(tasks, task); + +struct fds { + struct pollfd *list; + size_t listsz; +}; + +struct fetch { + char buf[SCI_MSG_MAX]; + FILE *bufp; +}; + +static struct { + char *url; + char *worker; + int maxbuilds; +} config = { + .url = "http://localhost", + .worker = "default", + .maxbuilds = 4 +}; + +static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); +static struct worker worker; +static int alive = 1; + +/* + * Show usage and exit with code 1. + */ +noreturn static void +usage(void) +{ + fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname()); + exit(1); +} + +/* + * Find a task by its id. + */ +static inline struct task * +find_by_fd(int fd) +{ + struct task *tk; + + TAILQ_FOREACH(tk, &tasks, link) + if (tk->pipe[0] == fd) + return tk; + + return NULL; +} + +/* + * Find a task by its pid number. + */ +static inline struct task * +find_by_pid(pid_t pid) +{ + struct task *t; + + TAILQ_FOREACH(t, &tasks, link) + if (t->child == pid) + return t; + + return NULL; +} + +/* + * Destroy a task entirely. + */ +static void +destroy(struct task *tk) +{ + log_debug("destroying task %d", tk->job_id); + unlink(tk->script); + + if (tk->pipe[0]) + close(tk->pipe[0]); + if (tk->pipe[1]) + close(tk->pipe[1]); + if (tk->scriptfd) + close(tk->scriptfd); + + TAILQ_REMOVE(&tasks, tk, link); + memset(tk, 0, sizeof (*tk)); + free(tk); +} + +static const char * +makeurl(const char *fmt, ...) +{ + assert(fmt); + + static char url[256]; + char page[128] = {0}; + va_list ap; + + va_start(ap, fmt); + vsnprintf(page, sizeof (page), fmt, ap); + va_end(ap); + + snprintf(url, sizeof (url), "%s/%s", config.url, page); + + return url; +} + +static void +complete(int signum, siginfo_t *sinfo, void *ctx) +{ + (void)ctx; + (void)signum; + + struct task *tk; + + if (waitpid(sinfo->si_pid, NULL, 0) < 0) + log_warn("waitpid: %s", strerror(errno)); + + if ((tk = find_by_pid(sinfo->si_pid))) { + log_debug("process %d terminated (exitcode=%d)", + (int)sinfo->si_pid, sinfo->si_status); + + close(tk->pipe[1]); + tk->status = TASKST_COMPLETED; + tk->exitcode = sinfo->si_status; + tk->pipe[1] = 0; + } +} + +static void +stop(int signum) +{ + log_warn("exiting on signal %d", signum); + alive = 0; +} + +static char * +uploadenc(const struct task *tk) +{ + json_t *doc; + + struct jobresult res = {0}; + char *dump; + + res.job_id = tk->job_id; + res.exitcode = tk->exitcode; + res.log = tk->out; + res.worker_id = worker.id; + + doc = jobresult_to(&res, 1); + dump = json_dumps(doc, JSON_COMPACT); + + json_decref(doc); + + return dump; +} + +static size_t +getcb(char *in, size_t n, size_t w, FILE *fp) +{ + if (fwrite(in, n, w, fp) != w) + return log_warn("get: %s", strerror(errno)), 0; + + return w; +} + +static json_t * +get(const char *topic, const char *url) +{ + CURL *curl; + CURLcode code; + + json_t *doc; + json_error_t error; + + char buf[SCI_MSG_MAX]; + long status; + FILE *fp; + + curl = curl_easy_init(); + + if (!(fp = fmemopen(buf, sizeof (buf), "w"))) + err(1, "fmemopen"); + + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp); + + if ((code = curl_easy_perform(curl)) != CURLE_OK) + log_warn("%s: %s", topic, curl_easy_strerror(code)); + + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); + curl_easy_cleanup(curl); + + fclose(fp); + + if (code != CURLE_OK) + return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL; + if (status != 200) + return log_warn("%s: unexpected status code %ld", topic, status), NULL; + if (!(doc = json_loads(buf, 0, &error))) + return log_warn("%s: %s", topic, error.text), NULL; + + return doc; +} + +static size_t +silent(char *in, size_t n, size_t w, void *data) +{ + (void)in; + (void)n; + (void)data; + + return w; +} + +static void +upload(struct task *tk) +{ + CURL *curl; + CURLcode code; + struct curl_slist *headers = NULL; + long status; + char *dump; + + curl = curl_easy_init(); + headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs")); + //curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000"); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk))); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump)); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + code = curl_easy_perform(curl); + curl_slist_free_all(headers); + + /* + * If we fail to upload data, we put the result into syncing mode so + * that we retry later without redoing the job over and over + */ + tk->status = TASKST_SYNCING; + + if (code != CURLE_OK) + log_warn("upload: %s", curl_easy_strerror(code)); + else { + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); + + if (status != 200) + log_warn("upload: unexpected return code: %ld", status); + else + destroy(tk); + } + + free(dump); + curl_easy_cleanup(curl); +} + +static inline int +pending(int id) +{ + struct task *t; + + TAILQ_FOREACH(t, &tasks, link) + if (t->job_id == id) + return 1; + + return 0; +} + +static void +queue(int id, int project_id, const char *tag) +{ + struct task *tk; + + log_info("queued job build (%d) for tag %s\n", id, tag); + + tk = util_calloc(1, sizeof (*tk)); + tk->job_id = id; + tk->project_id = project_id; + strlcpy(tk->job_tag, tag, sizeof (tk->job_tag)); + + TAILQ_INSERT_TAIL(&tasks, tk, link); +} + +static void +merge(json_t *doc) +{ + struct job jobs[SCI_JOB_LIST_MAX]; + ssize_t jobsz; + + if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) + log_warn("fetchjobs: %s", strerror(errno)); + else { + for (ssize_t i = 0; i < jobsz; ++i) { + if (!pending(jobs[i].id)) + queue(jobs[i].id, jobs[i].project_id, jobs[i].tag); + } + } + + json_decref(doc); +} + +static void +fetchjobs(void) +{ + json_t *doc; + + if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker)))) + log_warn("unable to retrieve jobs"); + else + merge(doc); +} + +/* + * This function reads stdout/stderr pipe from child and optionally remove them + * if they have completed. + */ +static void +readall(struct fds *fds) +{ + struct task *tk; + char buf[BUFSIZ]; + ssize_t nr; + + for (size_t i = 0; i < fds->listsz; ++i) { + if (fds->list[i].revents == 0) + continue; + if (!(tk = find_by_fd(fds->list[i].fd))) + continue; + + /* Read stdout/stderr from children pipe. */ + if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0) + tk->status = TASKST_SYNCING; + else { + buf[nr] = 0; + strlcat(tk->out, buf, sizeof (tk->out)); + } + } +} + +/* + * Retrieve status code from spawned process complete or upload again if they + * failed to sync. + */ +static void +flushall(void) +{ + struct task *tk, *tmp; + + TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) + if (tk->status == TASKST_SYNCING) + upload(tk); +} + +static int +extract(struct task *tk, json_t *doc) +{ + struct project proj; + size_t len; + + if (project_from(&proj, 1, doc) < 0) { + json_decref(doc); + log_warn("fetchproject: %s", strerror(errno)); + return -1; + } + + len = strlen(proj.script); + + if ((size_t)write(tk->scriptfd, proj.script, len) != len) { + json_decref(doc); + log_warn("fetchproject: %s", strerror(errno)); + return -1; + } + + /* Close so we can finally spawn it. */ + close(tk->scriptfd); + tk->scriptfd = 0; + + return 0; +} + +static int +fetchproject(struct task *tk) +{ + json_t *doc; + + if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id)))) + return -1; + + return extract(tk, doc); +} + +/* + * Create a task to run the script. This will retrieve the project script code + * at this moment and put it in a temporary file. + */ +static void +createtask(struct task *tk) +{ + if (tk->status != TASKST_PENDING) + return; + + log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag); + snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id); + + if ((tk->scriptfd = mkstemp(tk->script)) < 0 || + fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) { + unlink(tk->script); + log_warn("%s", strerror(errno)); + return; + } + + if (fetchproject(tk) < 0) { + unlink(tk->script); + close(tk->scriptfd); + tk->scriptfd = 0; + } else + spawn(tk); +} + +/* + * Start all pending tasks if the limit of running tasks is not reached. + */ +static void +startall(void) +{ + size_t nrunning = 0; + struct task *tk; + + TAILQ_FOREACH(tk, &tasks, link) + if (tk->status == TASKST_RUNNING) + ++nrunning; + + if (nrunning >= (size_t)config.maxbuilds) + log_debug("not spawning new process because limit is reached"); + else { + tk = TAILQ_FIRST(&tasks); + + while (tk && nrunning++ < (size_t)config.maxbuilds) { + createtask(tk); + tk = TAILQ_NEXT(tk, link); + } + } +} + +static void +fetchworker(void) +{ + json_t *doc; + + if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) || + worker_from(&worker, 1, doc) < 0) + errx(1, "unable to retrieve worker id"); + + log_info("worker id: %d", worker.id); + log_info("worker name: %s", worker.name); + log_info("worker description: %s", worker.desc); + + json_decref(doc); +} + +static void +init(void) +{ + struct sigaction sa; + + sa.sa_flags = SA_SIGINFO | SA_RESTART; + sa.sa_sigaction = complete; + sigemptyset(&sa.sa_mask); + + if (sigaction(SIGCHLD, &sa, NULL) < 0) + err(1, "sigaction"); + + sa.sa_flags = SA_RESTART; + sa.sa_handler = stop; + sigemptyset(&sa.sa_mask); + + if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0) + err(1, "sigaction"); + + log_open("sciworkerd"); + fetchworker(); +} + +static struct fds +prepare(void) +{ + struct fds fds = {0}; + struct task *tk; + size_t i = 0; + + TAILQ_FOREACH(tk, &tasks, link) + if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) + fds.listsz++; + + fds.list = util_calloc(fds.listsz, sizeof (*fds.list)); + + TAILQ_FOREACH(tk, &tasks, link) { + if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { + fds.list[i].fd = tk->pipe[0]; + fds.list[i++].events = POLLIN | POLLPRI; + } + } + + return fds; +} + +static void +run(void) +{ + struct fds fds; + + fds = prepare(); + + if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR) + err(1, "poll"); + + fetchjobs(); + readall(&fds); + startall(); + flushall(); +} + +static void +finish(void) +{ + size_t tot = 0; + struct task *tk, *tmp; + + TAILQ_FOREACH(tk, &tasks, link) + tot++; + + signal(SIGCHLD, SIG_IGN); + log_debug("killing remaining %zu tasks", tot); + + TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) { + if (tk->status == TASKST_RUNNING) { + kill(tk->child, SIGTERM); + waitpid(tk->child, NULL, 0); + } + + destroy(tk); + } +} + +int +main(int argc, char **argv) +{ + int ch; + const char *errstr; + + setprogname("sciworkerd"); + + while ((ch = getopt(argc, argv, "m:u:w:")) != -1) { + switch (ch) { + case 'm': + config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr); + + if (errstr) + errx(1, "%s: %s", optarg, errstr); + + break; + case 'u': + config.url = optarg; + break; + case 'w': + config.worker = optarg; + break; + default: + usage(); + break; + } + } + + init(); + + while (alive) + run(); + + finish(); +} +#endif + + + + + + + + + + +#include <err.h> +#include <errno.h> +#include <poll.h> +#include <signal.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#include "types.h" +#include "task.h" + +#define SCRIPT \ + "#!/bin/sh\n" \ + "echo yes\n" \ + "sleep 10\n" \ + "echo no 1>&2\n" \ + "sleep 1\n" \ + "exit 1" + +int +main(void) +{ + struct job job = { + .project_id = 10, + .id = 10, + .tag = "1234" + }; + struct sigaction sa = {0}; + struct pollfd fd; + struct task *t; + int run = 1; + + t = task_new(&job); + + if (task_setup(t, SCRIPT) < 0) + err(1, "task_set_script"); + if (task_start(t) < 0) + err(1, "task_start"); + + while (run) { + if (difftime(time(NULL), task_uptime(t)) >= 3) { + printf("task timeout !\n"); + task_kill(t); + task_wait(t); + break; + } + + task_prepare(t, &fd); + + if (poll(&fd, 1, 250) < 0 && errno != EINTR) + err(1, "poll"); + + switch (task_sync(t, &fd)) { + case -1: + err(1, "task_sync"); + case 0: + run = 0; + task_wait(t); + break; + default: + /* Keep going... */ + break; + } + } + + switch (task_status(t)) { + case TASKSTATUS_EXITED: + printf("process exited with code: %d\n", task_code(t).exitcode); + break; + case TASKSTATUS_KILLED: + printf("process killed with signal %d\n", task_code(t).sigcode); + break; + default: + break; + } + + printf("== console ==\n%s==\n", task_console(t)); + task_free(t); +}