Mercurial > sci
diff sciworkerd/sciworkerd.c @ 23:2cb228f23f53
misc: rework todo/jobs HTTP requests
author | David Demelier <markand@malikania.fr> |
---|---|
date | Thu, 21 Jul 2022 21:55:02 +0200 |
parents | f98ea578b1ef |
children | 34cbbd215ef7 |
line wrap: on
line diff
--- a/sciworkerd/sciworkerd.c Thu Jul 21 20:23:22 2022 +0200 +++ b/sciworkerd/sciworkerd.c Thu Jul 21 21:55:02 2022 +0200 @@ -78,7 +78,7 @@ } static void -merge(const struct job *jobs, size_t jobsz) +merge(struct job *jobs, size_t jobsz) { size_t total = 0; @@ -86,7 +86,8 @@ if (!pending(jobs[i].id)) { queue(&jobs[i]); total++; - } + } else + job_finish(&jobs[i]); } log_info(TAG "added %zu new pending tasks", total); @@ -104,13 +105,12 @@ struct job todo[SCI_JOB_LIST_MAX]; ssize_t todosz; - if (!startup) - startup = time(NULL); - if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { startup = now; - if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.id)) < 0) + log_info(TAG "fetching jobs"); + + if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0) log_warn(TAG "unable to fetch jobs: %s", req.error); else merge(todo, todosz); @@ -127,40 +127,15 @@ { struct apic req; - util_strlcpy(&worker.name, sciworkerd.name); + if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) + log_die(TAG "unable to fetch worker info: %s", req.error); - if (apic_worker_find(&req, &worker) < 0) - log_die(TAG, "unable to fetch worker info: %s", req.error); - - log_info("worker id: %d", worker.id); log_info("worker name: %s", worker.name); log_info("worker description: %s", worker.desc); apic_finish(&req); } -/* - * Fetch information about a project. - */ -static int -fetch_project(struct project *project, int id) -{ - struct apic req; - - if (apic_project_find_id(&req, project, id) < 0) - return -1; -#if 0 - if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0) - return log_warn(TAG "unable to fetch project info: %s", req.error), -1; - if (!req.doc) - return log_warn(TAG "empty project response"), -1; - if (project_from(project, 1, req.doc) < 0) - return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1; -#endif - - return 0; -} - static inline size_t count(const struct taskentry *head) { @@ -180,20 +155,26 @@ static int start(struct taskentry *entry) { - struct apic; + struct apic req; struct project project; pid_t pid; + int ret = -1; - if (apic_project_find_id(&project, entry->job.project_id) < 0) + if (apic_project_find(&req, &project, entry->job.project_name) < 0) return log_warn(TAG "unable to fetch project, dropping task"), -1; + if (task_setup(entry->task, project.script) < 0) - return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1; - if ((pid = task_start(entry->task)) < 0) - return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1; + log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); + else if ((pid = task_start(entry->task)) < 0) + log_warn(TAG "unable to spawn task process: %s", strerror(errno)); + else { + log_info(TAG "task %lld spawned", (long long int)pid); + ret = 0; + } - log_info(TAG "task %lld spawned", (long long int)pid); + project_finish(&project); - return 0; + return ret; } static inline void @@ -319,10 +300,10 @@ struct jobresult res = { .job_id = iter->job.id, .exitcode = code.exitcode, - .log = task_console(iter->task), - .worker_id = worker.id + .log = util_strdup(task_console(iter->task)), + .worker_name = worker.name }; - struct apicreq req; + struct apic req; json_t *doc; int ret; @@ -335,6 +316,9 @@ else log_info(TAG "task successfully published"); + jobresult_finish(&res); + apic_finish(&req); + return ret; } @@ -361,6 +345,8 @@ sigemptyset(&sa.sa_mask); sa.sa_handler = stop; + util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); + if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) log_die(TAG "sigaction: %s", strerror(errno));