Mercurial > sci
view sciworkerd.c @ 3:215c0c3b3609
misc: use JSON everywhere (scictl/sciwebd)
author | David Demelier <markand@malikania.fr> |
---|---|
date | Mon, 14 Jun 2021 22:08:24 +0200 |
parents | 5fa3d2f479b2 |
children | eb76429ce112 |
line wrap: on
line source
#include <sys/queue.h> #include <sys/types.h> #include <sys/wait.h> #include <assert.h> #include <err.h> #include <errno.h> #include <fcntl.h> #include <limits.h> #include <poll.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <stdnoreturn.h> #include <string.h> #include <unistd.h> #include <curl/curl.h> #include <jansson.h> #include "config.h" #include "log.h" #include "types.h" #include "util.h" #define TAG_MAX 256 enum taskst { TASKST_PENDING, /* not started yet. */ TASKST_RUNNING, /* currently running. */ TASKST_COMPLETED, /* completed but not synced yet. */ TASKST_SYNCING /* was unable to send result to host. */ }; struct task { enum taskst status; pid_t child; int pipe[2]; int exitcode; int job_id; int project_id; char job_tag[TAG_MAX]; char out[SCI_CONSOLE_MAX]; char script[PATH_MAX]; int scriptfd; TAILQ_ENTRY(task) link; }; TAILQ_HEAD(tasks, task); struct fds { struct pollfd *list; size_t listsz; }; struct result { pid_t pid; int status; }; struct fetch { char buf[SCI_MSG_MAX]; FILE *bufp; }; static struct { char *url; char *worker; int maxbuilds; } config = { .url = "http://localhost", .worker = "default", .maxbuilds = 4 }; static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); static struct worker worker; #if 0 static int sigpipe[2]; #endif noreturn static void usage(void) { fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname()); exit(1); } static inline struct task * find_by_fd(int fd) { struct task *tk; TAILQ_FOREACH(tk, &tasks, link) if (tk->pipe[0] == fd) return tk; return NULL; } static inline struct task * find_by_pid(pid_t pid) { struct task *t; TAILQ_FOREACH(t, &tasks, link) if (t->child == pid) return t; return NULL; } static void destroy(struct task *tk) { log_debug("destroying task %d", tk->job_id); if (tk->pipe[0]) close(tk->pipe[0]); if (tk->pipe[1]) close(tk->pipe[1]); if (tk->scriptfd) { unlink(tk->script); close(tk->scriptfd); } TAILQ_REMOVE(&tasks, tk, link); memset(tk, 0, sizeof (*tk)); free(tk); } static int spawn(struct task *tk) { if (pipe(tk->pipe) < 0) goto cleanup; switch ((tk->child = fork())) { case -1: log_warn("spawn: %s", strerror(errno)); goto cleanup; case 0: /* Child. */ dup2(tk->pipe[1], STDOUT_FILENO); dup2(tk->pipe[1], STDERR_FILENO); close(tk->pipe[0]); close(tk->pipe[1]); log_debug("spawn: running process (%lld) %s", (long long int)tk->child, tk->script); tk->status = TASKST_RUNNING; if (execl(tk->script, tk->script, tk->job_tag, NULL) < 0) { tk->status = TASKST_PENDING; log_warn("exec %s: %s", tk->script, strerror(errno)); exit(0); } break; default: /* Parent */ break; } return 0; cleanup: destroy(tk); return -1; } static const char * makeurl(const char *fmt, ...) { assert(fmt); static char url[256]; char page[128] = {0}; va_list ap; va_start(ap, fmt); vsnprintf(page, sizeof (page), fmt, ap); va_end(ap); snprintf(url, sizeof (url), "%s/%s", config.url, page); return url; } static void complete(int signum, siginfo_t *sinfo, void *ctx) { (void)ctx; (void)signum; #if 0 struct result r; struct task *tk; #endif struct task *tk; int status = 0; if (sinfo->si_code != CLD_EXITED) return; #if 0 r.pid = sinfo->si_pid; r.status = 0; #endif if (waitpid(sinfo->si_pid, &status, 0) < 0) { log_warn("waitpid: %s", strerror(errno)); return; } if ((tk = find_by_pid(sinfo->si_pid))) { log_debug("process %lld completed", (long long int)sinfo->si_pid); close(tk->pipe[1]); tk->status = TASKST_COMPLETED; tk->exitcode = status; tk->pipe[1] = 0; } #if 0 /* * Signal may happen at any time from any thread so we can't use * mutexes so use the good old self-pipe trick. Yes, signals are * probably the most fundamental broken UNIX feature. */ if (write(sigpipe[1], &r, sizeof (r)) < 0) err(1, "write"); #endif } static const char * uploadenc(const struct task *tk) { json_t *doc; struct jobresult res = {0}; char *dump; res.job_id = tk->job_id; res.exitcode = tk->exitcode; res.log = tk->out; res.worker_id = worker.id; doc = jobresult_to(&res, 1); dump = json_dumps(doc, JSON_COMPACT); json_decref(doc); return dump; } static size_t getcb(char *in, size_t n, size_t w, FILE *fp) { if (fwrite(in, n, w, fp) != w) return log_warn("get: %s", strerror(errno)), 0; return w; } static json_t * get(const char *topic, const char *url) { CURL *curl; CURLcode code; json_t *doc; json_error_t error; char buf[SCI_MSG_MAX]; long status; FILE *fp; curl = curl_easy_init(); if (!(fp = fmemopen(buf, sizeof (buf), "w"))) err(1, "fmemopen"); curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb); curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp); if ((code = curl_easy_perform(curl)) != CURLE_OK) log_warn("%s: %s", topic, curl_easy_strerror(code)); curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); curl_easy_cleanup(curl); fclose(fp); if (code != CURLE_OK) return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL; if (status != 200) return log_warn("%s: unexpected status code %ld", topic, status), NULL; if (!(doc = json_loads(buf, 0, &error))) return log_warn("%s: %s", topic, error.text), NULL; return doc; } static size_t silent(char *in, size_t n, size_t w, void *data) { (void)in; (void)n; (void)data; return w; } static void upload(struct task *tk) { CURL *curl; CURLcode code; long status; curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs")); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, uploadenc(tk)); code = curl_easy_perform(curl); /* * If we fail to upload data, we put the result into syncing mode so * that we retry later without redoing the job over and over */ tk->status = TASKST_SYNCING; if (code != CURLE_OK) log_warn("upload: %s", curl_easy_strerror(code)); else { curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); if (status != 200) log_warn("upload: unexpected return code: %ld", status); else destroy(tk); } curl_easy_cleanup(curl); } static inline void finished(struct task *tk) { log_info("task %d: completed with exit code %d", tk->child, tk->exitcode); printf("== OUTPUT ==\n"); puts(tk->out); upload(tk); } static inline int pending(int id) { struct task *t; TAILQ_FOREACH(t, &tasks, link) if (t->job_id == id) return 1; return 0; } static void queue(int id, int project_id, const char *tag) { struct task *tk; log_info("queued job build (%d) for tag %s\n", id, tag); tk = util_calloc(1, sizeof (*tk)); tk->job_id = id; tk->project_id = project_id; strlcpy(tk->job_tag, tag, sizeof (tk->job_tag)); TAILQ_INSERT_TAIL(&tasks, tk, link); } static void merge(json_t *doc) { struct job jobs[SCI_JOB_LIST_MAX]; ssize_t jobsz; if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) log_warn("fetchjobs: %s", strerror(errno)); else { for (ssize_t i = 0; i < jobsz; ++i) { if (!pending(jobs[i].id)) queue(jobs[i].id, jobs[i].project_id, jobs[i].tag); } } json_decref(doc); } static void fetchjobs(void) { json_t *doc; if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker)))) log_warn("unable to retrieve jobs"); else merge(doc); } /* * This function reads stdout/stderr pipe from child and optionally remove them * if they have completed. */ static void readall(struct fds *fds) { struct task *tk; char buf[BUFSIZ]; ssize_t nr; for (size_t i = 0; i < fds->listsz; ++i) { if (fds->list[i].revents == 0) continue; if (!(tk = find_by_fd(fds->list[i].fd))) continue; /* Read stdout/stderr from children pipe. */ if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0) tk->status = TASKST_SYNCING; else { buf[nr] = 0; strlcat(tk->out, buf, sizeof (tk->out)); } } } /* * Retrieve status code from spawned process complete or upload again if they * failed to sync. */ static void flushall(void) { struct task *tk, *tmp; TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) if (tk->status == TASKST_SYNCING) upload(tk); } static int extract(struct task *tk, json_t *doc) { struct project proj; size_t len; if (project_from(&proj, 1, doc) < 0) { json_decref(doc); log_warn("fetchproject: %s", strerror(errno)); return -1; } len = strlen(proj.script); if ((size_t)write(tk->scriptfd, proj.script, len) != len) { json_decref(doc); log_warn("fetchproject: %s", strerror(errno)); return -1; } /* Close so we can finally spawn it. */ close(tk->scriptfd); tk->scriptfd = 0; return 0; } static int fetchproject(struct task *tk) { json_t *doc; if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id)))) return -1; return extract(tk, doc); } /* * Create a task to run the script. This will retrieve the project script code * at this moment and put it in a temporary file. */ static void createtask(struct task *tk) { if (tk->status != TASKST_PENDING) return; log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag); snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id); if ((tk->scriptfd = mkstemp(tk->script)) < 0 || fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) { unlink(tk->script); log_warn("%s", strerror(errno)); return; } if (fetchproject(tk) < 0) { unlink(tk->script); close(tk->scriptfd); tk->scriptfd = 0; } else spawn(tk); } /* * Start all pending tasks if the limit of running tasks is not reached. */ static void startall(void) { size_t nrunning = 0; struct task *tk; TAILQ_FOREACH(tk, &tasks, link) if (tk->status == TASKST_RUNNING) ++nrunning; if (nrunning >= (size_t)config.maxbuilds) log_debug("not spawning new process because limit is reached"); else { tk = TAILQ_FIRST(&tasks); while (tk && nrunning++ < (size_t)config.maxbuilds) { createtask(tk); tk = TAILQ_NEXT(tk, link); } } } static void fetchworker(void) { json_t *doc; if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) || worker_from(&worker, 1, doc) < 0) errx(1, "unable to retrieve worker id"); log_info("worker id: %d", worker.id); log_info("worker name: %s", worker.name); log_info("worker description: %s", worker.desc); json_decref(doc); } static void init(void) { struct sigaction sa; sa.sa_flags = SA_SIGINFO; sa.sa_sigaction = complete; sigemptyset(&sa.sa_mask); if (sigaction(SIGCHLD, &sa, NULL) < 0) err(1, "sigaction"); log_open("sciworkerd"); fetchworker(); #if 0 if (pipe(sigpipe) < 0) err(1, "pipe"); if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 || fcntl(sigpipe[1], F_SETFL, flags | O_NONBLOCK) < 0) err(1, "fcntl"); #endif } static struct fds prepare(void) { struct fds fds = {0}; struct task *tk; size_t i = 0; TAILQ_FOREACH(tk, &tasks, link) if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) fds.listsz++; fds.list = util_calloc(fds.listsz, sizeof (*fds.list)); #if 0 fds.list[0].fd = sigpipe[0]; fds.list[0].events = POLLIN; #endif TAILQ_FOREACH(tk, &tasks, link) { if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { fds.list[i].fd = tk->pipe[0]; fds.list[i++].events = POLLIN | POLLPRI; } } return fds; } static void run(void) { struct fds fds; fds = prepare(); if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR) err(1, "poll"); fetchjobs(); readall(&fds); startall(); flushall(); } int main(int argc, char **argv) { int ch; const char *errstr; setprogname("sciworkerd"); while ((ch = getopt(argc, argv, "m:u:w:")) != -1) { switch (ch) { case 'm': config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr); if (errstr) errx(1, "%s: %s", optarg, errstr); break; case 'u': config.url = optarg; break; case 'w': config.worker = optarg; break; default: usage(); break; } } init(); for (;;) run(); }