Mercurial > sci
diff sciworkerd/main.c @ 19:de4bf839b565
misc: revamp SQL
author | David Demelier <markand@malikania.fr> |
---|---|
date | Fri, 15 Jul 2022 11:11:48 +0200 |
parents | 600204c31bf0 |
children | 2cb228f23f53 |
line wrap: on
line diff
--- a/sciworkerd/main.c Tue Jul 12 20:20:51 2022 +0200 +++ b/sciworkerd/main.c Fri Jul 15 11:11:48 2022 +0200 @@ -16,719 +16,49 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -#if 0 - -#include <sys/queue.h> -#include <sys/stat.h> -#include <sys/types.h> -#include <sys/wait.h> -#include <assert.h> -#include <err.h> -#include <errno.h> -#include <fcntl.h> -#include <limits.h> -#include <poll.h> -#include <signal.h> #include <stdio.h> #include <stdlib.h> -#include <stdnoreturn.h> -#include <string.h> #include <unistd.h> -#include <curl/curl.h> -#include <jansson.h> - -#include "config.h" -#include "log.h" -#include "types.h" -#include "util.h" - -#define TAG_MAX 256 - -struct task { - enum taskst status; - pid_t child; - int pipe[2]; - int exitcode; - int job_id; - int project_id; - char job_tag[TAG_MAX]; - char out[SCI_CONSOLE_MAX]; - char script[PATH_MAX]; - int scriptfd; - TAILQ_ENTRY(task) link; -}; - -TAILQ_HEAD(tasks, task); - -struct fds { - struct pollfd *list; - size_t listsz; -}; - -struct fetch { - char buf[SCI_MSG_MAX]; - FILE *bufp; -}; - -static struct { - char *url; - char *worker; - int maxbuilds; -} config = { - .url = "http://localhost", - .worker = "default", - .maxbuilds = 4 -}; - -static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); -static struct worker worker; -static int alive = 1; - -/* - * Show usage and exit with code 1. - */ -noreturn static void -usage(void) -{ - fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname()); - exit(1); -} - -/* - * Find a task by its id. - */ -static inline struct task * -find_by_fd(int fd) -{ - struct task *tk; - - TAILQ_FOREACH(tk, &tasks, link) - if (tk->pipe[0] == fd) - return tk; - - return NULL; -} - -/* - * Find a task by its pid number. - */ -static inline struct task * -find_by_pid(pid_t pid) -{ - struct task *t; - - TAILQ_FOREACH(t, &tasks, link) - if (t->child == pid) - return t; - - return NULL; -} - -/* - * Destroy a task entirely. - */ -static void -destroy(struct task *tk) -{ - log_debug("destroying task %d", tk->job_id); - unlink(tk->script); - - if (tk->pipe[0]) - close(tk->pipe[0]); - if (tk->pipe[1]) - close(tk->pipe[1]); - if (tk->scriptfd) - close(tk->scriptfd); - - TAILQ_REMOVE(&tasks, tk, link); - memset(tk, 0, sizeof (*tk)); - free(tk); -} - -static const char * -makeurl(const char *fmt, ...) -{ - assert(fmt); - - static char url[256]; - char page[128] = {0}; - va_list ap; - - va_start(ap, fmt); - vsnprintf(page, sizeof (page), fmt, ap); - va_end(ap); - - snprintf(url, sizeof (url), "%s/%s", config.url, page); - - return url; -} - -static void -complete(int signum, siginfo_t *sinfo, void *ctx) -{ - (void)ctx; - (void)signum; - - struct task *tk; - - if (waitpid(sinfo->si_pid, NULL, 0) < 0) - log_warn("waitpid: %s", strerror(errno)); - - if ((tk = find_by_pid(sinfo->si_pid))) { - log_debug("process %d terminated (exitcode=%d)", - (int)sinfo->si_pid, sinfo->si_status); - - close(tk->pipe[1]); - tk->status = TASKST_COMPLETED; - tk->exitcode = sinfo->si_status; - tk->pipe[1] = 0; - } -} - -static void -stop(int signum) -{ - log_warn("exiting on signal %d", signum); - alive = 0; -} - -static char * -uploadenc(const struct task *tk) -{ - json_t *doc; - - struct jobresult res = {0}; - char *dump; - - res.job_id = tk->job_id; - res.exitcode = tk->exitcode; - res.log = tk->out; - res.worker_id = worker.id; - - doc = jobresult_to(&res, 1); - dump = json_dumps(doc, JSON_COMPACT); - - json_decref(doc); - - return dump; -} - -static size_t -getcb(char *in, size_t n, size_t w, FILE *fp) -{ - if (fwrite(in, n, w, fp) != w) - return log_warn("get: %s", strerror(errno)), 0; - - return w; -} - -static json_t * -get(const char *topic, const char *url) -{ - CURL *curl; - CURLcode code; - - json_t *doc; - json_error_t error; - - char buf[SCI_MSG_MAX]; - long status; - FILE *fp; - - curl = curl_easy_init(); - - if (!(fp = fmemopen(buf, sizeof (buf), "w"))) - err(1, "fmemopen"); - - curl_easy_setopt(curl, CURLOPT_URL, url); - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp); - - if ((code = curl_easy_perform(curl)) != CURLE_OK) - log_warn("%s: %s", topic, curl_easy_strerror(code)); - - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); - curl_easy_cleanup(curl); - - fclose(fp); - - if (code != CURLE_OK) - return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL; - if (status != 200) - return log_warn("%s: unexpected status code %ld", topic, status), NULL; - if (!(doc = json_loads(buf, 0, &error))) - return log_warn("%s: %s", topic, error.text), NULL; - - return doc; -} - -static size_t -silent(char *in, size_t n, size_t w, void *data) -{ - (void)in; - (void)n; - (void)data; - - return w; -} +#include "sciworkerd.h" static void -upload(struct task *tk) -{ - CURL *curl; - CURLcode code; - struct curl_slist *headers = NULL; - long status; - char *dump; - - curl = curl_easy_init(); - headers = curl_slist_append(headers, "Content-Type: application/json"); - curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs")); - //curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000"); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk))); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump)); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - code = curl_easy_perform(curl); - curl_slist_free_all(headers); - - /* - * If we fail to upload data, we put the result into syncing mode so - * that we retry later without redoing the job over and over - */ - tk->status = TASKST_SYNCING; - - if (code != CURLE_OK) - log_warn("upload: %s", curl_easy_strerror(code)); - else { - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); - - if (status != 200) - log_warn("upload: unexpected return code: %ld", status); - else - destroy(tk); - } - - free(dump); - curl_easy_cleanup(curl); -} - -static inline int -pending(int id) -{ - struct task *t; - - TAILQ_FOREACH(t, &tasks, link) - if (t->job_id == id) - return 1; - - return 0; -} - -static void -queue(int id, int project_id, const char *tag) -{ - struct task *tk; - - log_info("queued job build (%d) for tag %s\n", id, tag); - - tk = util_calloc(1, sizeof (*tk)); - tk->job_id = id; - tk->project_id = project_id; - strlcpy(tk->job_tag, tag, sizeof (tk->job_tag)); - - TAILQ_INSERT_TAIL(&tasks, tk, link); -} - -static void -merge(json_t *doc) +env(void) { - struct job jobs[SCI_JOB_LIST_MAX]; - ssize_t jobsz; - - if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) - log_warn("fetchjobs: %s", strerror(errno)); - else { - for (ssize_t i = 0; i < jobsz; ++i) { - if (!pending(jobs[i].id)) - queue(jobs[i].id, jobs[i].project_id, jobs[i].tag); - } - } - - json_decref(doc); -} - -static void -fetchjobs(void) -{ - json_t *doc; - - if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker)))) - log_warn("unable to retrieve jobs"); - else - merge(doc); -} - -/* - * This function reads stdout/stderr pipe from child and optionally remove them - * if they have completed. - */ -static void -readall(struct fds *fds) -{ - struct task *tk; - char buf[BUFSIZ]; - ssize_t nr; - - for (size_t i = 0; i < fds->listsz; ++i) { - if (fds->list[i].revents == 0) - continue; - if (!(tk = find_by_fd(fds->list[i].fd))) - continue; - - /* Read stdout/stderr from children pipe. */ - if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0) - tk->status = TASKST_SYNCING; - else { - buf[nr] = 0; - strlcat(tk->out, buf, sizeof (tk->out)); - } - } -} - -/* - * Retrieve status code from spawned process complete or upload again if they - * failed to sync. - */ -static void -flushall(void) -{ - struct task *tk, *tmp; - - TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) - if (tk->status == TASKST_SYNCING) - upload(tk); -} - -static int -extract(struct task *tk, json_t *doc) -{ - struct project proj; - size_t len; - - if (project_from(&proj, 1, doc) < 0) { - json_decref(doc); - log_warn("fetchproject: %s", strerror(errno)); - return -1; - } - - len = strlen(proj.script); - - if ((size_t)write(tk->scriptfd, proj.script, len) != len) { - json_decref(doc); - log_warn("fetchproject: %s", strerror(errno)); - return -1; - } - - /* Close so we can finally spawn it. */ - close(tk->scriptfd); - tk->scriptfd = 0; + const char *env; - return 0; -} - -static int -fetchproject(struct task *tk) -{ - json_t *doc; - - if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id)))) - return -1; - - return extract(tk, doc); -} - -/* - * Create a task to run the script. This will retrieve the project script code - * at this moment and put it in a temporary file. - */ -static void -createtask(struct task *tk) -{ - if (tk->status != TASKST_PENDING) - return; - - log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag); - snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id); - - if ((tk->scriptfd = mkstemp(tk->script)) < 0 || - fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) { - unlink(tk->script); - log_warn("%s", strerror(errno)); - return; - } - - if (fetchproject(tk) < 0) { - unlink(tk->script); - close(tk->scriptfd); - tk->scriptfd = 0; - } else - spawn(tk); -} - -/* - * Start all pending tasks if the limit of running tasks is not reached. - */ -static void -startall(void) -{ - size_t nrunning = 0; - struct task *tk; - - TAILQ_FOREACH(tk, &tasks, link) - if (tk->status == TASKST_RUNNING) - ++nrunning; - - if (nrunning >= (size_t)config.maxbuilds) - log_debug("not spawning new process because limit is reached"); - else { - tk = TAILQ_FIRST(&tasks); - - while (tk && nrunning++ < (size_t)config.maxbuilds) { - createtask(tk); - tk = TAILQ_NEXT(tk, link); - } - } -} - -static void -fetchworker(void) -{ - json_t *doc; - - if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) || - worker_from(&worker, 1, doc) < 0) - errx(1, "unable to retrieve worker id"); - - log_info("worker id: %d", worker.id); - log_info("worker name: %s", worker.name); - log_info("worker description: %s", worker.desc); - - json_decref(doc); -} - -static void -init(void) -{ - struct sigaction sa; - - sa.sa_flags = SA_SIGINFO | SA_RESTART; - sa.sa_sigaction = complete; - sigemptyset(&sa.sa_mask); - - if (sigaction(SIGCHLD, &sa, NULL) < 0) - err(1, "sigaction"); - - sa.sa_flags = SA_RESTART; - sa.sa_handler = stop; - sigemptyset(&sa.sa_mask); - - if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0) - err(1, "sigaction"); - - log_open("sciworkerd"); - fetchworker(); -} - -static struct fds -prepare(void) -{ - struct fds fds = {0}; - struct task *tk; - size_t i = 0; - - TAILQ_FOREACH(tk, &tasks, link) - if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) - fds.listsz++; - - fds.list = util_calloc(fds.listsz, sizeof (*fds.list)); - - TAILQ_FOREACH(tk, &tasks, link) { - if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { - fds.list[i].fd = tk->pipe[0]; - fds.list[i++].events = POLLIN | POLLPRI; - } - } - - return fds; -} - -static void -run(void) -{ - struct fds fds; - - fds = prepare(); - - if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR) - err(1, "poll"); - - fetchjobs(); - readall(&fds); - startall(); - flushall(); -} - -static void -finish(void) -{ - size_t tot = 0; - struct task *tk, *tmp; - - TAILQ_FOREACH(tk, &tasks, link) - tot++; - - signal(SIGCHLD, SIG_IGN); - log_debug("killing remaining %zu tasks", tot); - - TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) { - if (tk->status == TASKST_RUNNING) { - kill(tk->child, SIGTERM); - waitpid(tk->child, NULL, 0); - } - - destroy(tk); - } + if ((env = getenv("SCI_URL"))) + snprintf(sciworkerd.url, sizeof (sciworkerd.url), "%s", optarg); + if ((env = getenv("SCI_WORKER"))) + snprintf(sciworkerd.name, sizeof (sciworkerd.name), "%s", optarg); } int main(int argc, char **argv) { - int ch; - const char *errstr; + int ch, val; - setprogname("sciworkerd"); + env(); + opterr = 0; - while ((ch = getopt(argc, argv, "m:u:w:")) != -1) { + while ((ch = getopt(argc, argv, "j:t:u:w:")) != -1) { switch (ch) { - case 'm': - config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr); - - if (errstr) - errx(1, "%s: %s", optarg, errstr); - + case 'j': + if ((val = atoi(optarg)) > 0) + sciworkerd.maxjobs = val; + break; + case 't': + if ((val = atoi(optarg)) > 0) + sciworkerd.timeout = val; break; case 'u': - config.url = optarg; + snprintf(sciworkerd.url, sizeof (sciworkerd.url), "%s", optarg); break; case 'w': - config.worker = optarg; + snprintf(sciworkerd.name, sizeof (sciworkerd.name), "%s", optarg); break; default: - usage(); break; } } - - init(); - - while (alive) - run(); - - finish(); } -#endif - - - - - - - - - - -#include <err.h> -#include <errno.h> -#include <poll.h> -#include <signal.h> -#include <string.h> -#include <time.h> -#include <unistd.h> - -#include "types.h" -#include "task.h" - -#define SCRIPT \ - "#!/bin/sh\n" \ - "echo yes\n" \ - "sleep 10\n" \ - "echo no 1>&2\n" \ - "sleep 1\n" \ - "exit 1" - -int -main(void) -{ - struct job job = { - .project_id = 10, - .id = 10, - .tag = "1234" - }; - struct sigaction sa = {0}; - struct pollfd fd; - struct task *t; - int run = 1; - - t = task_new(&job); - - if (task_setup(t, SCRIPT) < 0) - err(1, "task_set_script"); - if (task_start(t) < 0) - err(1, "task_start"); - - while (run) { - if (difftime(time(NULL), task_uptime(t)) >= 3) { - printf("task timeout !\n"); - task_kill(t); - task_wait(t); - break; - } - - task_prepare(t, &fd); - - if (poll(&fd, 1, 250) < 0 && errno != EINTR) - err(1, "poll"); - - switch (task_sync(t, &fd)) { - case -1: - err(1, "task_sync"); - case 0: - run = 0; - task_wait(t); - break; - default: - /* Keep going... */ - break; - } - } - - switch (task_status(t)) { - case TASKSTATUS_EXITED: - printf("process exited with code: %d\n", task_code(t).exitcode); - break; - case TASKSTATUS_KILLED: - printf("process killed with signal %d\n", task_code(t).sigcode); - break; - default: - break; - } - - printf("== console ==\n%s==\n", task_console(t)); - task_free(t); -}