Mercurial > sci
changeset 10:eb76429ce112
sciworkerd: improve process destruction
author | David Demelier <markand@malikania.fr> |
---|---|
date | Tue, 29 Jun 2021 20:40:17 +0200 |
parents | 3ef8128e244f |
children | 0647f9ec7319 |
files | page-api-jobs.c sciworkerd.c sql/init.sql |
diffstat | 3 files changed, 61 insertions(+), 71 deletions(-) [+] |
line wrap: on
line diff
--- a/page-api-jobs.c Wed Jun 23 14:05:36 2021 +0200 +++ b/page-api-jobs.c Tue Jun 29 20:40:17 2021 +0200 @@ -79,7 +79,7 @@ { if (r->fieldsz < 1) page(r, NULL, KHTTP_400, KMIME_APP_JSON, NULL); - else if (save(r->fields[0].key) < 0) + else if (save(r->fields[0].val) < 0) page(r, NULL, KHTTP_500, KMIME_APP_JSON, NULL); else { khttp_head(r, kresps[KRESP_CONTENT_TYPE], "%s", kmimetypes[KMIME_APP_JSON]);
--- a/sciworkerd.c Wed Jun 23 14:05:36 2021 +0200 +++ b/sciworkerd.c Tue Jun 29 20:40:17 2021 +0200 @@ -52,11 +52,6 @@ size_t listsz; }; -struct result { - pid_t pid; - int status; -}; - struct fetch { char buf[SCI_MSG_MAX]; FILE *bufp; @@ -74,10 +69,7 @@ static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks); static struct worker worker; - -#if 0 -static int sigpipe[2]; -#endif +static int alive = 1; noreturn static void usage(void) @@ -114,15 +106,14 @@ destroy(struct task *tk) { log_debug("destroying task %d", tk->job_id); + unlink(tk->script); if (tk->pipe[0]) close(tk->pipe[0]); if (tk->pipe[1]) close(tk->pipe[1]); - if (tk->scriptfd) { - unlink(tk->script); + if (tk->scriptfd) close(tk->scriptfd); - } TAILQ_REMOVE(&tasks, tk, link); memset(tk, 0, sizeof (*tk)); @@ -148,8 +139,6 @@ log_debug("spawn: running process (%lld) %s", (long long int)tk->child, tk->script); - tk->status = TASKST_RUNNING; - if (execl(tk->script, tk->script, tk->job_tag, NULL) < 0) { tk->status = TASKST_PENDING; log_warn("exec %s: %s", tk->script, strerror(errno)); @@ -158,6 +147,7 @@ break; default: /* Parent */ + tk->status = TASKST_RUNNING; break; } @@ -193,47 +183,30 @@ (void)ctx; (void)signum; -#if 0 - struct result r; struct task *tk; -#endif - struct task *tk; - int status = 0; - - if (sinfo->si_code != CLD_EXITED) - return; -#if 0 - r.pid = sinfo->si_pid; - r.status = 0; -#endif - - if (waitpid(sinfo->si_pid, &status, 0) < 0) { + if (waitpid(sinfo->si_pid, NULL, 0) < 0) log_warn("waitpid: %s", strerror(errno)); - return; - } - if ((tk = find_by_pid(sinfo->si_pid))) { - log_debug("process %lld completed", (long long int)sinfo->si_pid); + log_debug("process %d terminated (exitcode=%d)", + (int)sinfo->si_pid, sinfo->si_status); + close(tk->pipe[1]); tk->status = TASKST_COMPLETED; - tk->exitcode = status; + tk->exitcode = sinfo->si_status; tk->pipe[1] = 0; } - -#if 0 - /* - * Signal may happen at any time from any thread so we can't use - * mutexes so use the good old self-pipe trick. Yes, signals are - * probably the most fundamental broken UNIX feature. - */ - if (write(sigpipe[1], &r, sizeof (r)) < 0) - err(1, "write"); -#endif } -static const char * +static void +stop(int signum) +{ + log_warn("exiting on signal %d", signum); + alive = 0; +} + +static char * uploadenc(const struct task *tk) { json_t *doc; @@ -320,15 +293,22 @@ { CURL *curl; CURLcode code; + struct curl_slist *headers = NULL; long status; + char *dump; curl = curl_easy_init(); + headers = curl_slist_append(headers, "Content-Type: application/json"); curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs")); + //curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000"); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, uploadenc(tk)); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk))); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump)); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); code = curl_easy_perform(curl); + curl_slist_free_all(headers); /* * If we fail to upload data, we put the result into syncing mode so @@ -347,18 +327,10 @@ destroy(tk); } + free(dump); curl_easy_cleanup(curl); } -static inline void -finished(struct task *tk) -{ - log_info("task %d: completed with exit code %d", tk->child, tk->exitcode); - printf("== OUTPUT ==\n"); - puts(tk->out); - upload(tk); -} - static inline int pending(int id) { @@ -568,23 +540,22 @@ { struct sigaction sa; - sa.sa_flags = SA_SIGINFO; + sa.sa_flags = SA_SIGINFO | SA_RESTART; sa.sa_sigaction = complete; sigemptyset(&sa.sa_mask); if (sigaction(SIGCHLD, &sa, NULL) < 0) err(1, "sigaction"); + sa.sa_flags = SA_RESTART; + sa.sa_handler = stop; + sigemptyset(&sa.sa_mask); + + if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0) + err(1, "sigaction"); + log_open("sciworkerd"); fetchworker(); - -#if 0 - if (pipe(sigpipe) < 0) - err(1, "pipe"); - if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 || - fcntl(sigpipe[1], F_SETFL, flags | O_NONBLOCK) < 0) - err(1, "fcntl"); -#endif } static struct fds @@ -600,11 +571,6 @@ fds.list = util_calloc(fds.listsz, sizeof (*fds.list)); -#if 0 - fds.list[0].fd = sigpipe[0]; - fds.list[0].events = POLLIN; -#endif - TAILQ_FOREACH(tk, &tasks, link) { if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) { fds.list[i].fd = tk->pipe[0]; @@ -631,6 +597,28 @@ flushall(); } +static void +finish(void) +{ + size_t tot = 0; + struct task *tk, *tmp; + + TAILQ_FOREACH(tk, &tasks, link) + tot++; + + signal(SIGCHLD, SIG_IGN); + log_debug("killing remaining %zu tasks", tot); + + TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) { + if (tk->status == TASKST_RUNNING) { + kill(tk->child, SIGTERM); + waitpid(tk->child, NULL, 0); + } + + destroy(tk); + } +} + int main(int argc, char **argv) { @@ -662,6 +650,8 @@ init(); - for (;;) + while (alive) run(); + + finish(); }
--- a/sql/init.sql Wed Jun 23 14:05:36 2021 +0200 +++ b/sql/init.sql Tue Jun 29 20:40:17 2021 +0200 @@ -4,7 +4,7 @@ desc TEXT NOT NULL, url TEXT NOT NULL, script TEXT NOT NULL, - date INTEGER NOT NULL DEFAULT (strftime('%s','now')) + date INTEGER NOT NULL DEFAULT (strftime('%s', 'now')) ); CREATE TABLE IF NOT EXISTS worker(