changeset 10:eb76429ce112

sciworkerd: improve process destruction
author David Demelier <markand@malikania.fr>
date Tue, 29 Jun 2021 20:40:17 +0200
parents 3ef8128e244f
children 0647f9ec7319
files page-api-jobs.c sciworkerd.c sql/init.sql
diffstat 3 files changed, 61 insertions(+), 71 deletions(-) [+]
line wrap: on
line diff
--- a/page-api-jobs.c	Wed Jun 23 14:05:36 2021 +0200
+++ b/page-api-jobs.c	Tue Jun 29 20:40:17 2021 +0200
@@ -79,7 +79,7 @@
 {
 	if (r->fieldsz < 1)
 		page(r, NULL, KHTTP_400, KMIME_APP_JSON, NULL);
-	else if (save(r->fields[0].key) < 0)
+	else if (save(r->fields[0].val) < 0)
 		page(r, NULL, KHTTP_500, KMIME_APP_JSON, NULL);
 	else {
 		khttp_head(r, kresps[KRESP_CONTENT_TYPE], "%s", kmimetypes[KMIME_APP_JSON]);
--- a/sciworkerd.c	Wed Jun 23 14:05:36 2021 +0200
+++ b/sciworkerd.c	Tue Jun 29 20:40:17 2021 +0200
@@ -52,11 +52,6 @@
 	size_t listsz;
 };
 
-struct result {
-	pid_t pid;
-	int status;
-};
-
 struct fetch {
 	char buf[SCI_MSG_MAX];
 	FILE *bufp;
@@ -74,10 +69,7 @@
 
 static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks);
 static struct worker worker;
-
-#if 0
-static int sigpipe[2];
-#endif
+static int alive = 1;
 
 noreturn static void
 usage(void)
@@ -114,15 +106,14 @@
 destroy(struct task *tk)
 {
 	log_debug("destroying task %d", tk->job_id);
+	unlink(tk->script);
 
 	if (tk->pipe[0])
 		close(tk->pipe[0]);
 	if (tk->pipe[1])
 		close(tk->pipe[1]);
-	if (tk->scriptfd) {
-		unlink(tk->script);
+	if (tk->scriptfd)
 		close(tk->scriptfd);
-	}
 
 	TAILQ_REMOVE(&tasks, tk, link);
 	memset(tk, 0, sizeof (*tk));
@@ -148,8 +139,6 @@
 		log_debug("spawn: running process (%lld) %s",
 		    (long long int)tk->child, tk->script);
 
-		tk->status = TASKST_RUNNING;
-
 		if (execl(tk->script, tk->script, tk->job_tag, NULL) < 0) {
 			tk->status = TASKST_PENDING;
 			log_warn("exec %s: %s", tk->script, strerror(errno));
@@ -158,6 +147,7 @@
 		break;
 	default:
 		/* Parent */
+		tk->status = TASKST_RUNNING;
 		break;
 	}
 
@@ -193,47 +183,30 @@
 	(void)ctx;
 	(void)signum;
 
-#if 0
-	struct result r;
 	struct task *tk;
-#endif
-	struct task *tk;
-	int status = 0;
-
-	if (sinfo->si_code != CLD_EXITED)
-		return;
 
-#if 0
-	r.pid = sinfo->si_pid;
-	r.status = 0;
-#endif
-
-	if (waitpid(sinfo->si_pid, &status, 0) < 0) {
+	if (waitpid(sinfo->si_pid, NULL, 0) < 0)
 		log_warn("waitpid: %s", strerror(errno));
-		return;
-	}
-
 
 	if ((tk = find_by_pid(sinfo->si_pid))) {
-		log_debug("process %lld completed", (long long int)sinfo->si_pid);
+		log_debug("process %d terminated (exitcode=%d)",
+		    (int)sinfo->si_pid, sinfo->si_status);
+
 		close(tk->pipe[1]);
 		tk->status = TASKST_COMPLETED;
-		tk->exitcode = status;
+		tk->exitcode = sinfo->si_status;
 		tk->pipe[1] = 0;
 	}
-
-#if 0
-	/*
-	 * Signal may happen at any time from any thread so we can't use
-	 * mutexes so use the good old self-pipe trick. Yes, signals are
-	 * probably the most fundamental broken UNIX feature.
-	 */
-	if (write(sigpipe[1], &r, sizeof (r)) < 0)
-		err(1, "write");
-#endif
 }
 
-static const char *
+static void
+stop(int signum)
+{
+	log_warn("exiting on signal %d", signum);
+	alive = 0;
+}
+
+static char *
 uploadenc(const struct task *tk)
 {
 	json_t *doc;
@@ -320,15 +293,22 @@
 {
 	CURL *curl;
 	CURLcode code;
+	struct curl_slist *headers = NULL;
 	long status;
+	char *dump;
 
 	curl = curl_easy_init();
+	headers = curl_slist_append(headers, "Content-Type: application/json");
 	curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs"));
+	//curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000");
 	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
 	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
 	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
-	curl_easy_setopt(curl, CURLOPT_POSTFIELDS, uploadenc(tk));
+	curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk)));
+	curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump));
+	curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
 	code = curl_easy_perform(curl);
+	curl_slist_free_all(headers);
 
 	/*
 	 * If we fail to upload data, we put the result into syncing mode so
@@ -347,18 +327,10 @@
 			destroy(tk);
 	}
 
+	free(dump);
 	curl_easy_cleanup(curl);
 }
 
-static inline void
-finished(struct task *tk)
-{
-	log_info("task %d: completed with exit code %d", tk->child, tk->exitcode);
-	printf("== OUTPUT ==\n");
-	puts(tk->out);
-	upload(tk);
-}
-
 static inline int
 pending(int id)
 {
@@ -568,23 +540,22 @@
 {
 	struct sigaction sa;
 
-	sa.sa_flags = SA_SIGINFO;
+	sa.sa_flags = SA_SIGINFO | SA_RESTART;
 	sa.sa_sigaction = complete;
 	sigemptyset(&sa.sa_mask);
 
 	if (sigaction(SIGCHLD, &sa, NULL) < 0)
 		err(1, "sigaction");
 
+	sa.sa_flags = SA_RESTART;
+	sa.sa_handler = stop;
+	sigemptyset(&sa.sa_mask);
+
+	if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0)
+		err(1, "sigaction");
+
 	log_open("sciworkerd");
 	fetchworker();
-
-#if 0
-	if (pipe(sigpipe) < 0)
-		err(1, "pipe");
-	if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 ||
-	    fcntl(sigpipe[1], F_SETFL, flags | O_NONBLOCK) < 0)
-		err(1, "fcntl");
-#endif
 }
 
 static struct fds
@@ -600,11 +571,6 @@
 
 	fds.list = util_calloc(fds.listsz, sizeof (*fds.list));
 
-#if 0
-	fds.list[0].fd = sigpipe[0];
-	fds.list[0].events = POLLIN;
-#endif
-
 	TAILQ_FOREACH(tk, &tasks, link) {
 		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
 			fds.list[i].fd = tk->pipe[0];
@@ -631,6 +597,28 @@
 	flushall();
 }
 
+static void
+finish(void)
+{
+	size_t tot = 0;
+	struct task *tk, *tmp;
+
+	TAILQ_FOREACH(tk, &tasks, link)
+		tot++;
+
+	signal(SIGCHLD, SIG_IGN);
+	log_debug("killing remaining %zu tasks", tot);
+
+	TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) {
+		if (tk->status == TASKST_RUNNING) {
+			kill(tk->child, SIGTERM);
+			waitpid(tk->child, NULL, 0);
+		}
+
+		destroy(tk);
+	}
+}
+
 int
 main(int argc, char **argv)
 {
@@ -662,6 +650,8 @@
 
 	init();
 
-	for (;;)
+	while (alive)
 		run();
+
+	finish();
 }
--- a/sql/init.sql	Wed Jun 23 14:05:36 2021 +0200
+++ b/sql/init.sql	Tue Jun 29 20:40:17 2021 +0200
@@ -4,7 +4,7 @@
 	desc TEXT NOT NULL,
 	url TEXT NOT NULL,
 	script TEXT NOT NULL,
-	date INTEGER NOT NULL DEFAULT (strftime('%s','now'))
+	date INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
 );
 
 CREATE TABLE IF NOT EXISTS worker(