Mercurial > sci
diff sciworkerd.c @ 3:215c0c3b3609
misc: use JSON everywhere (scictl/sciwebd)
author | David Demelier <markand@malikania.fr> |
---|---|
date | Mon, 14 Jun 2021 22:08:24 +0200 |
parents | 5fa3d2f479b2 |
children | eb76429ce112 |
line wrap: on
line diff
--- a/sciworkerd.c Thu Jun 10 10:39:21 2021 +0200 +++ b/sciworkerd.c Mon Jun 14 22:08:24 2021 +0200 @@ -18,11 +18,12 @@ #include <jansson.h> #include "config.h" -#include "job.h" #include "log.h" -#include "project.h" +#include "types.h" #include "util.h" +#define TAG_MAX 256 + enum taskst { TASKST_PENDING, /* not started yet. */ TASKST_RUNNING, /* currently running. */ @@ -34,8 +35,10 @@ enum taskst status; pid_t child; int pipe[2]; - int retcode; - struct job job; + int exitcode; + int job_id; + int project_id; + char job_tag[TAG_MAX]; char out[SCI_CONSOLE_MAX]; char script[PATH_MAX]; int scriptfd; @@ -70,6 +73,7 @@ }; static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); +static struct worker worker; #if 0 static int sigpipe[2]; @@ -109,7 +113,7 @@ static void destroy(struct task *tk) { - log_debug("destroying task %lld", tk->job.id); + log_debug("destroying task %d", tk->job_id); if (tk->pipe[0]) close(tk->pipe[0]); @@ -141,11 +145,12 @@ dup2(tk->pipe[1], STDERR_FILENO); close(tk->pipe[0]); close(tk->pipe[1]); - log_debug("spawn: running process (%lld) %s", tk->child, tk->script); + log_debug("spawn: running process (%lld) %s", + (long long int)tk->child, tk->script); tk->status = TASKST_RUNNING; - if (execl(tk->script, tk->script, tk->job.tag, NULL) < 0) { + if (execl(tk->script, tk->script, tk->job_tag, NULL) < 0) { tk->status = TASKST_PENDING; log_warn("exec %s: %s", tk->script, strerror(errno)); exit(0); @@ -213,7 +218,7 @@ log_debug("process %lld completed", (long long int)sinfo->si_pid); close(tk->pipe[1]); tk->status = TASKST_COMPLETED; - tk->retcode = status; + tk->exitcode = status; tk->pipe[1] = 0; } @@ -228,73 +233,25 @@ #endif } -static void -init(void) -{ - struct sigaction sa; - - sa.sa_flags = SA_SIGINFO; - sa.sa_sigaction = complete; - sigemptyset(&sa.sa_mask); - - if (sigaction(SIGCHLD, &sa, NULL) < 0) - err(1, "sigaction"); - - log_open("sciworkerd"); - -#if 0 - if (pipe(sigpipe) < 0) - err(1, "pipe"); - if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 || - fcntl(sigpipe[1], F_SETFL, flags | O_NONBLOCK) < 0) - err(1, "fcntl"); -#endif -} - -static struct fds -prepare(void) -{ - struct fds fds = {0}; - struct task *tk; - size_t i = 0; - - TAILQ_FOREACH(tk, &tasks, link) - if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) - fds.listsz++; - - fds.list = util_calloc(fds.listsz, sizeof (*fds.list)); - -#if 0 - fds.list[0].fd = sigpipe[0]; - fds.list[0].events = POLLIN; -#endif - printf("fd => %zu\n", fds.listsz); - - TAILQ_FOREACH(tk, &tasks, link) { - if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { - printf("adding %d to pollin\n", tk->pipe[0]); - fds.list[i].fd = tk->pipe[0]; - fds.list[i++].events = POLLIN | POLLPRI; - } - } - - return fds; -} - static const char * uploadenc(const struct task *tk) { - static char json[SCI_MSG_MAX]; - json_t *object; + json_t *doc; + + struct jobresult res = {0}; + char *dump; - object = json_object(); - json_object_set(object, "code", json_string(tk->out)); - json_object_set(object, "id", json_integer(tk->job.id)); - json_object_set(object, "retcode", json_integer(tk->retcode)); - strlcpy(json, json_dumps(object, JSON_COMPACT), sizeof (json)); - json_decref(object); + res.job_id = tk->job_id; + res.exitcode = tk->exitcode; + res.log = tk->out; + res.worker_id = worker.id; - return json; + doc = jobresult_to(&res, 1); + dump = json_dumps(doc, JSON_COMPACT); + + json_decref(doc); + + return dump; } static size_t @@ -306,12 +263,16 @@ return w; } -static const char * +static json_t * get(const char *topic, const char *url) { CURL *curl; CURLcode code; - static char buf[SCI_MSG_MAX]; + + json_t *doc; + json_error_t error; + + char buf[SCI_MSG_MAX]; long status; FILE *fp; @@ -320,9 +281,6 @@ if (!(fp = fmemopen(buf, sizeof (buf), "w"))) err(1, "fmemopen"); -#if 0 - curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/script/%s", tk->job.project.name)); -#endif curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); @@ -341,8 +299,10 @@ return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL; if (status != 200) return log_warn("%s: unexpected status code %ld", topic, status), NULL; + if (!(doc = json_loads(buf, 0, &error))) + return log_warn("%s: %s", topic, error.text), NULL; - return buf; + return doc; } static size_t @@ -363,7 +323,7 @@ long status; curl = curl_easy_init(); - curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs/%s", config.worker)); + curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs")); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent); @@ -393,83 +353,66 @@ static inline void finished(struct task *tk) { - log_info("task %d: completed with exit code %d", tk->child, tk->retcode); + log_info("task %d: completed with exit code %d", tk->child, tk->exitcode); printf("== OUTPUT ==\n"); puts(tk->out); upload(tk); } static inline int -pending(int64_t id) +pending(int id) { struct task *t; TAILQ_FOREACH(t, &tasks, link) - if (t->job.id == id) + if (t->job_id == id) return 1; return 0; } static void -push(int64_t id, const char *tag, const char *project) +queue(int id, int project_id, const char *tag) { struct task *tk; - log_info("queued job build (%lld) for project %s, tag %s\n", id, project, tag); + log_info("queued job build (%d) for tag %s\n", id, tag); tk = util_calloc(1, sizeof (*tk)); - tk->job.id = id; - strlcpy(tk->job.tag, tag, sizeof (tk->job.tag)); - strlcpy(tk->job.project.name, project, sizeof (tk->job.project.name)); + tk->job_id = id; + tk->project_id = project_id; + strlcpy(tk->job_tag, tag, sizeof (tk->job_tag)); TAILQ_INSERT_TAIL(&tasks, tk, link); } static void -merge(const char *str) +merge(json_t *doc) { - json_t *array, *obj, *id, *tag, *project; - json_error_t err; - size_t i; + struct job jobs[SCI_JOB_LIST_MAX]; + ssize_t jobsz; - if (!(array = json_loads(str, 0, &err))) { - log_warn("fetch: failed to decode JSON: %s", err.text); - return; - } - if (!json_is_array(array)) - goto invalid; - - json_array_foreach(array, i, obj) { - if (!json_is_object(obj) || - !json_is_integer((id = json_object_get(obj, "id"))) || - !json_is_string((tag = json_object_get(obj, "tag"))) || - !json_is_string((project = json_object_get(obj, "project")))) - goto invalid; - - if (!pending(json_integer_value(id))) - push(json_integer_value(id), json_string_value(tag), - json_string_value(project)); + if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) + log_warn("fetchjobs: %s", strerror(errno)); + else { + for (ssize_t i = 0; i < jobsz; ++i) { + if (!pending(jobs[i].id)) + queue(jobs[i].id, jobs[i].project_id, jobs[i].tag); + } } - json_decref(array); - - return; - -invalid: - log_warn("fetch: invalid JSON input"); - json_decref(array); + json_decref(doc); } static void fetchjobs(void) { - const char *json; + json_t *doc; - if (!(json = get("fetch", makeurl("api/v1/jobs/%s", config.worker)))) + if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker)))) log_warn("unable to retrieve jobs"); else - merge(json); + merge(doc); } /* @@ -514,26 +457,22 @@ } static int -extract(struct task *tk, const char *json) +extract(struct task *tk, json_t *doc) { - json_t *doc, *code; - json_error_t err; + struct project proj; size_t len; - if (!(doc = json_loads(json, 0, &err))) { - log_warn("fetchscript: failed to decode JSON: %s", err.text); + if (project_from(&proj, 1, doc) < 0) { + json_decref(doc); + log_warn("fetchproject: %s", strerror(errno)); return -1; } - if (!json_is_object(doc) || - !json_is_string((code = json_object_get(doc, "code")))) - goto invalid; + + len = strlen(proj.script); - len = strlen(json_string_value(code)); - - if ((size_t)write(tk->scriptfd, json_string_value(code), len) != len) { - log_warn("fetchscript: %s", strerror(errno)); + if ((size_t)write(tk->scriptfd, proj.script, len) != len) { json_decref(doc); - + log_warn("fetchproject: %s", strerror(errno)); return -1; } @@ -542,36 +481,31 @@ tk->scriptfd = 0; return 0; - -invalid: - log_warn("fetchscript: invalid JSON"); - json_decref(doc); - - return -1; } static int -fetchscript(struct task *tk) +fetchproject(struct task *tk) { - const char *json; + json_t *doc; - if (!(json = get("fetchscript", makeurl("api/v1/script/%s", tk->job.project.name)))) + if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id)))) return -1; - return extract(tk, json); + return extract(tk, doc); } +/* + * Create a task to run the script. This will retrieve the project script code + * at this moment and put it in a temporary file. + */ static void createtask(struct task *tk) { if (tk->status != TASKST_PENDING) return; - log_debug("creating task (id=%lld, project=%s, tag=%s)", - tk->job.id, tk->job.project.name, tk->job.tag); - - snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%s-XXXXXX", - tk->job.project.name); + log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag); + snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id); if ((tk->scriptfd = mkstemp(tk->script)) < 0 || fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) { @@ -580,7 +514,7 @@ return; } - if (fetchscript(tk) < 0) { + if (fetchproject(tk) < 0) { unlink(tk->script); close(tk->scriptfd); tk->scriptfd = 0; @@ -588,6 +522,9 @@ spawn(tk); } +/* + * Start all pending tasks if the limit of running tasks is not reached. + */ static void startall(void) { @@ -598,9 +535,9 @@ if (tk->status == TASKST_RUNNING) ++nrunning; - if (nrunning >= (size_t)config.maxbuilds) { + if (nrunning >= (size_t)config.maxbuilds) log_debug("not spawning new process because limit is reached"); - } else { + else { tk = TAILQ_FIRST(&tasks); while (tk && nrunning++ < (size_t)config.maxbuilds) { @@ -611,6 +548,74 @@ } static void +fetchworker(void) +{ + json_t *doc; + + if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) || + worker_from(&worker, 1, doc) < 0) + errx(1, "unable to retrieve worker id"); + + log_info("worker id: %d", worker.id); + log_info("worker name: %s", worker.name); + log_info("worker description: %s", worker.desc); + + json_decref(doc); +} + +static void +init(void) +{ + struct sigaction sa; + + sa.sa_flags = SA_SIGINFO; + sa.sa_sigaction = complete; + sigemptyset(&sa.sa_mask); + + if (sigaction(SIGCHLD, &sa, NULL) < 0) + err(1, "sigaction"); + + log_open("sciworkerd"); + fetchworker(); + +#if 0 + if (pipe(sigpipe) < 0) + err(1, "pipe"); + if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 || + fcntl(sigpipe[1], F_SETFL, flags | O_NONBLOCK) < 0) + err(1, "fcntl"); +#endif +} + +static struct fds +prepare(void) +{ + struct fds fds = {0}; + struct task *tk; + size_t i = 0; + + TAILQ_FOREACH(tk, &tasks, link) + if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) + fds.listsz++; + + fds.list = util_calloc(fds.listsz, sizeof (*fds.list)); + +#if 0 + fds.list[0].fd = sigpipe[0]; + fds.list[0].events = POLLIN; +#endif + + TAILQ_FOREACH(tk, &tasks, link) { + if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { + fds.list[i].fd = tk->pipe[0]; + fds.list[i++].events = POLLIN | POLLPRI; + } + } + + return fds; +} + +static void run(void) { struct fds fds;