Mercurial > sci
changeset 23:2cb228f23f53
misc: rework todo/jobs HTTP requests
author | David Demelier <markand@malikania.fr> |
---|---|
date | Thu, 21 Jul 2022 21:55:02 +0200 |
parents | dd078aea5d02 |
children | 34cbbd215ef7 |
files | Makefile lib/apic.c lib/apic.h scictl/scictl.c scid/http.c scid/page-api-jobs.c scid/page-api-jobs.h sciworkerd/main.c sciworkerd/sciworkerd.c |
diffstat | 9 files changed, 152 insertions(+), 50 deletions(-) [+] |
line wrap: on
line diff
--- a/Makefile Thu Jul 21 20:23:22 2022 +0200 +++ b/Makefile Thu Jul 21 21:55:02 2022 +0200 @@ -52,6 +52,7 @@ SCID= scid/scid SCID_SRCS= scid/http.c \ scid/main.c \ + scid/page-api-jobs.c \ scid/page-api-projects.c \ scid/page-api-todo.c \ scid/page-api-workers.c \ @@ -59,6 +60,7 @@ SCID_OBJS= ${SCID_SRCS:.c=.o} SCID_DEPS= ${SCID_SRCS:.c=.d} +SCIWORKERD= sciworkerd/sciworkerd SCIWORKERD_SRCS= sciworkerd/main.c \ sciworkerd/sciworkerd.c \ sciworkerd/task.c
--- a/lib/apic.c Thu Jul 21 20:23:22 2022 +0200 +++ b/lib/apic.c Thu Jul 21 21:55:02 2022 +0200 @@ -283,7 +283,7 @@ } ssize_t -apic_job_todo(struct apic *req, struct job *jobs, size_t jobsz, intmax_t worker_id) +apic_job_todo(struct apic *req, struct job *jobs, size_t jobsz, const char *worker_name) { assert(req); assert(jobs); @@ -294,7 +294,7 @@ .unpack = wrap_job_from }; - return get(req, &cv, "api/v1/jobs/%jd", worker_id); + return get(req, &cv, "api/v1/todo/%s", worker_name); } int
--- a/lib/apic.h Thu Jul 21 20:23:22 2022 +0200 +++ b/lib/apic.h Thu Jul 21 21:55:02 2022 +0200 @@ -2,7 +2,6 @@ #define SCI_APIC_H #include <sys/types.h> -#include <stdint.h> #include <jansson.h> @@ -47,7 +46,7 @@ apic_job_add(struct apic *, struct job *); ssize_t -apic_job_todo(struct apic *, struct job *, size_t, intmax_t); +apic_job_todo(struct apic *, struct job *, size_t, const char *); int apic_jobresult_add(struct apic *, struct jobresult *);
--- a/scictl/scictl.c Thu Jul 21 20:23:22 2022 +0200 +++ b/scictl/scictl.c Thu Jul 21 21:55:02 2022 +0200 @@ -133,7 +133,7 @@ if (argc < 2) usage(); - if ((jobsz = apic_job_todo(&req, jobs, UTIL_SIZE(jobs), toint(argv[1])))) + if ((jobsz = apic_job_todo(&req, jobs, UTIL_SIZE(jobs), argv[1]))) util_die("abort: %s\n", req.error); for (size_t i = 0; i < jobsz; ++i) {
--- a/scid/http.c Thu Jul 21 20:23:22 2022 +0200 +++ b/scid/http.c Thu Jul 21 21:55:02 2022 +0200 @@ -27,10 +27,11 @@ #include "http.h" #include "log.h" -#include "page.h" +#include "page-api-jobs.h" #include "page-api-projects.h" #include "page-api-todo.h" #include "page-api-workers.h" +#include "page.h" enum page { PAGE_API, @@ -44,8 +45,9 @@ const char *prefix; void (*handler)(struct kreq *); } apis[] = { + { "v1/jobs", page_api_v1_jobs }, + { "v1/projects", page_api_v1_projects }, { "v1/todo", page_api_v1_todo }, - { "v1/projects", page_api_v1_projects }, { "v1/workers", page_api_v1_workers }, { NULL, NULL } };
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scid/page-api-jobs.c Thu Jul 21 21:55:02 2022 +0200 @@ -0,0 +1,82 @@ +/* + * page-api-jobs.c -- /api/v?/jobs route + * + * Copyright (c) 2021 David Demelier <markand@malikania.fr> + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/types.h> +#include <assert.h> +#include <stdarg.h> +#include <stdint.h> +#include <kcgi.h> + +#include "db.h" +#include "log.h" +#include "page.h" +#include "types.h" + +static int +save(const char *json) +{ + struct job job = {0}; + int ret = -1; + + json_t *doc; + json_error_t err; + + if (!(doc = json_loads(json, 0, &err))) + log_warn("api/post: invalid JSON input: %s", err.text); + else if (job_from(&job, 1, doc) < 0) + log_warn("api/post: failed to decode parameters"); + else if (db_job_add(&job) < 0) + log_warn("api/post: database save error"); + else + ret = 0; + + json_decref(doc); + job_finish(&job); + + return ret; +} + +static void +post(struct kreq *r) +{ + if (r->fieldsz < 1) + page(r, NULL, KHTTP_400, KMIME_APP_JSON, NULL); + else if (save(r->fields[0].val) < 0) + page(r, NULL, KHTTP_500, KMIME_APP_JSON, NULL); + else { + khttp_head(r, kresps[KRESP_CONTENT_TYPE], "%s", kmimetypes[KMIME_APP_JSON]); + khttp_head(r, kresps[KRESP_STATUS], "%s", khttps[KHTTP_200]); + khttp_body(r); + khttp_free(r); + } +} + +void +page_api_v1_jobs(struct kreq *r) +{ + assert(r); + + switch (r->method) { + case KMETHOD_POST: + post(r); + break; + default: + page(r, NULL, KHTTP_400, KMIME_APP_JSON, NULL); + break; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scid/page-api-jobs.h Thu Jul 21 21:55:02 2022 +0200 @@ -0,0 +1,27 @@ +/* + * page-api-jobs.h -- /api/v?/jobs route + * + * Copyright (c) 2021 David Demelier <markand@malikania.fr> + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef SCI_PAGE_API_JOBS_H +#define SCI_PAGE_API_JOBS_H + +struct kreq; + +void +page_api_v1_jobs(struct kreq *); + +#endif /* !SCI_PAGE_API_JOBS_H */
--- a/sciworkerd/main.c Thu Jul 21 20:23:22 2022 +0200 +++ b/sciworkerd/main.c Thu Jul 21 21:55:02 2022 +0200 @@ -61,4 +61,8 @@ break; } } + + sciworkerd_init(); + sciworkerd_run(); + sciworkerd_finish(); }
--- a/sciworkerd/sciworkerd.c Thu Jul 21 20:23:22 2022 +0200 +++ b/sciworkerd/sciworkerd.c Thu Jul 21 21:55:02 2022 +0200 @@ -78,7 +78,7 @@ } static void -merge(const struct job *jobs, size_t jobsz) +merge(struct job *jobs, size_t jobsz) { size_t total = 0; @@ -86,7 +86,8 @@ if (!pending(jobs[i].id)) { queue(&jobs[i]); total++; - } + } else + job_finish(&jobs[i]); } log_info(TAG "added %zu new pending tasks", total); @@ -104,13 +105,12 @@ struct job todo[SCI_JOB_LIST_MAX]; ssize_t todosz; - if (!startup) - startup = time(NULL); - if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { startup = now; - if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.id)) < 0) + log_info(TAG "fetching jobs"); + + if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0) log_warn(TAG "unable to fetch jobs: %s", req.error); else merge(todo, todosz); @@ -127,40 +127,15 @@ { struct apic req; - util_strlcpy(&worker.name, sciworkerd.name); + if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) + log_die(TAG "unable to fetch worker info: %s", req.error); - if (apic_worker_find(&req, &worker) < 0) - log_die(TAG, "unable to fetch worker info: %s", req.error); - - log_info("worker id: %d", worker.id); log_info("worker name: %s", worker.name); log_info("worker description: %s", worker.desc); apic_finish(&req); } -/* - * Fetch information about a project. - */ -static int -fetch_project(struct project *project, int id) -{ - struct apic req; - - if (apic_project_find_id(&req, project, id) < 0) - return -1; -#if 0 - if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0) - return log_warn(TAG "unable to fetch project info: %s", req.error), -1; - if (!req.doc) - return log_warn(TAG "empty project response"), -1; - if (project_from(project, 1, req.doc) < 0) - return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1; -#endif - - return 0; -} - static inline size_t count(const struct taskentry *head) { @@ -180,20 +155,26 @@ static int start(struct taskentry *entry) { - struct apic; + struct apic req; struct project project; pid_t pid; + int ret = -1; - if (apic_project_find_id(&project, entry->job.project_id) < 0) + if (apic_project_find(&req, &project, entry->job.project_name) < 0) return log_warn(TAG "unable to fetch project, dropping task"), -1; + if (task_setup(entry->task, project.script) < 0) - return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1; - if ((pid = task_start(entry->task)) < 0) - return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1; + log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); + else if ((pid = task_start(entry->task)) < 0) + log_warn(TAG "unable to spawn task process: %s", strerror(errno)); + else { + log_info(TAG "task %lld spawned", (long long int)pid); + ret = 0; + } - log_info(TAG "task %lld spawned", (long long int)pid); + project_finish(&project); - return 0; + return ret; } static inline void @@ -319,10 +300,10 @@ struct jobresult res = { .job_id = iter->job.id, .exitcode = code.exitcode, - .log = task_console(iter->task), - .worker_id = worker.id + .log = util_strdup(task_console(iter->task)), + .worker_name = worker.name }; - struct apicreq req; + struct apic req; json_t *doc; int ret; @@ -335,6 +316,9 @@ else log_info(TAG "task successfully published"); + jobresult_finish(&res); + apic_finish(&req); + return ret; } @@ -361,6 +345,8 @@ sigemptyset(&sa.sa_mask); sa.sa_handler = stop; + util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); + if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) log_die(TAG "sigaction: %s", strerror(errno));