diff sciworkerd.c @ 3:215c0c3b3609

misc: use JSON everywhere (scictl/sciwebd)
author David Demelier <markand@malikania.fr>
date Mon, 14 Jun 2021 22:08:24 +0200
parents 5fa3d2f479b2
children eb76429ce112
line wrap: on
line diff
--- a/sciworkerd.c	Thu Jun 10 10:39:21 2021 +0200
+++ b/sciworkerd.c	Mon Jun 14 22:08:24 2021 +0200
@@ -18,11 +18,12 @@
 #include <jansson.h>
 
 #include "config.h"
-#include "job.h"
 #include "log.h"
-#include "project.h"
+#include "types.h"
 #include "util.h"
 
+#define TAG_MAX 256
+
 enum taskst {
 	TASKST_PENDING,         /* not started yet. */
 	TASKST_RUNNING,         /* currently running. */
@@ -34,8 +35,10 @@
 	enum taskst status;
 	pid_t child;
 	int pipe[2];
-	int retcode;
-	struct job job;
+	int exitcode;
+	int job_id;
+	int project_id;
+	char job_tag[TAG_MAX];
 	char out[SCI_CONSOLE_MAX];
 	char script[PATH_MAX];
 	int scriptfd;
@@ -70,6 +73,7 @@
 };
 
 static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks);
+static struct worker worker;
 
 #if 0
 static int sigpipe[2];
@@ -109,7 +113,7 @@
 static void
 destroy(struct task *tk)
 {
-	log_debug("destroying task %lld", tk->job.id);
+	log_debug("destroying task %d", tk->job_id);
 
 	if (tk->pipe[0])
 		close(tk->pipe[0]);
@@ -141,11 +145,12 @@
 		dup2(tk->pipe[1], STDERR_FILENO);
 		close(tk->pipe[0]);
 		close(tk->pipe[1]);
-		log_debug("spawn: running process (%lld) %s", tk->child, tk->script);
+		log_debug("spawn: running process (%lld) %s",
+		    (long long int)tk->child, tk->script);
 
 		tk->status = TASKST_RUNNING;
 
-		if (execl(tk->script, tk->script, tk->job.tag, NULL) < 0) {
+		if (execl(tk->script, tk->script, tk->job_tag, NULL) < 0) {
 			tk->status = TASKST_PENDING;
 			log_warn("exec %s: %s", tk->script, strerror(errno));
 			exit(0);
@@ -213,7 +218,7 @@
 		log_debug("process %lld completed", (long long int)sinfo->si_pid);
 		close(tk->pipe[1]);
 		tk->status = TASKST_COMPLETED;
-		tk->retcode = status;
+		tk->exitcode = status;
 		tk->pipe[1] = 0;
 	}
 
@@ -228,73 +233,25 @@
 #endif
 }
 
-static void
-init(void)
-{
-	struct sigaction sa;
-
-	sa.sa_flags = SA_SIGINFO;
-	sa.sa_sigaction = complete;
-	sigemptyset(&sa.sa_mask);
-
-	if (sigaction(SIGCHLD, &sa, NULL) < 0)
-		err(1, "sigaction");
-
-	log_open("sciworkerd");
-
-#if 0
-	if (pipe(sigpipe) < 0)
-		err(1, "pipe");
-	if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 ||
-	    fcntl(sigpipe[1], F_SETFL, flags | O_NONBLOCK) < 0)
-		err(1, "fcntl");
-#endif
-}
-
-static struct fds
-prepare(void)
-{
-	struct fds fds = {0};
-	struct task *tk;
-	size_t i = 0;
-
-	TAILQ_FOREACH(tk, &tasks, link)
-		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED)
-			fds.listsz++;
-
-	fds.list = util_calloc(fds.listsz, sizeof (*fds.list));
-
-#if 0
-	fds.list[0].fd = sigpipe[0];
-	fds.list[0].events = POLLIN;
-#endif
-	printf("fd => %zu\n", fds.listsz);
-
-	TAILQ_FOREACH(tk, &tasks, link) {
-		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
-			printf("adding %d to pollin\n", tk->pipe[0]);
-			fds.list[i].fd = tk->pipe[0];
-			fds.list[i++].events = POLLIN | POLLPRI;
-		}
-	}
-
-	return fds;
-}
-
 static const char *
 uploadenc(const struct task *tk)
 {
-	static char json[SCI_MSG_MAX];
-	json_t *object;
+	json_t *doc;
+
+	struct jobresult res = {0};
+	char *dump;
 
-	object = json_object();
-	json_object_set(object, "code", json_string(tk->out));
-	json_object_set(object, "id", json_integer(tk->job.id));
-	json_object_set(object, "retcode", json_integer(tk->retcode));
-	strlcpy(json, json_dumps(object, JSON_COMPACT), sizeof (json));
-	json_decref(object);
+	res.job_id = tk->job_id;
+	res.exitcode = tk->exitcode;
+	res.log = tk->out;
+	res.worker_id = worker.id;
 
-	return json;
+	doc = jobresult_to(&res, 1);
+	dump = json_dumps(doc, JSON_COMPACT);
+
+	json_decref(doc);
+
+	return dump;
 }
 
 static size_t
@@ -306,12 +263,16 @@
 	return w;
 }
 
-static const char *
+static json_t *
 get(const char *topic, const char *url)
 {
 	CURL *curl;
 	CURLcode code;
-	static char buf[SCI_MSG_MAX];
+
+	json_t *doc;
+	json_error_t error;
+
+	char buf[SCI_MSG_MAX];
 	long status;
 	FILE *fp;
 
@@ -320,9 +281,6 @@
 	if (!(fp = fmemopen(buf, sizeof (buf), "w")))
 		err(1, "fmemopen");
 
-#if 0
-	curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/script/%s", tk->job.project.name));
-#endif
 	curl_easy_setopt(curl, CURLOPT_URL, url);
 	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
 	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
@@ -341,8 +299,10 @@
 		return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL;
 	if (status != 200)
 		return log_warn("%s: unexpected status code %ld", topic, status), NULL;
+	if (!(doc = json_loads(buf, 0, &error)))
+		return log_warn("%s: %s", topic, error.text), NULL;
 
-	return buf;
+	return doc;
 }
 
 static size_t
@@ -363,7 +323,7 @@
 	long status;
 
 	curl = curl_easy_init();
-	curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs/%s", config.worker));
+	curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs"));
 	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
 	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
 	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
@@ -393,83 +353,66 @@
 static inline void
 finished(struct task *tk)
 {
-	log_info("task %d: completed with exit code %d", tk->child, tk->retcode);
+	log_info("task %d: completed with exit code %d", tk->child, tk->exitcode);
 	printf("== OUTPUT ==\n");
 	puts(tk->out);
 	upload(tk);
 }
 
 static inline int
-pending(int64_t id)
+pending(int id)
 {
 	struct task *t;
 
 	TAILQ_FOREACH(t, &tasks, link)
-		if (t->job.id == id)
+		if (t->job_id == id)
 			return 1;
 
 	return 0;
 }
 
 static void
-push(int64_t id, const char *tag, const char *project)
+queue(int id, int project_id, const char *tag)
 {
 	struct task *tk;
 
-	log_info("queued job build (%lld) for project %s, tag %s\n", id, project, tag);
+	log_info("queued job build (%d) for tag %s\n", id, tag);
 
 	tk = util_calloc(1, sizeof (*tk));
-	tk->job.id = id;
-	strlcpy(tk->job.tag, tag, sizeof (tk->job.tag));
-	strlcpy(tk->job.project.name, project, sizeof (tk->job.project.name));
+	tk->job_id = id;
+	tk->project_id = project_id;
+	strlcpy(tk->job_tag, tag, sizeof (tk->job_tag));
 
 	TAILQ_INSERT_TAIL(&tasks, tk, link);
 }
 
 static void
-merge(const char *str)
+merge(json_t *doc)
 {
-	json_t *array, *obj, *id, *tag, *project;
-	json_error_t err;
-	size_t i;
+	struct job jobs[SCI_JOB_LIST_MAX];
+	ssize_t jobsz;
 
-	if (!(array = json_loads(str, 0, &err))) {
-		log_warn("fetch: failed to decode JSON: %s", err.text);
-		return;
-	}
-	if (!json_is_array(array))
-		goto invalid;
-
-	json_array_foreach(array, i, obj) {
-		if (!json_is_object(obj) ||
-		    !json_is_integer((id = json_object_get(obj, "id"))) ||
-		    !json_is_string((tag = json_object_get(obj, "tag"))) ||
-		    !json_is_string((project = json_object_get(obj, "project"))))
-			goto invalid;
-
-		if (!pending(json_integer_value(id)))
-			push(json_integer_value(id), json_string_value(tag),
-			    json_string_value(project));
+	if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
+		log_warn("fetchjobs: %s", strerror(errno));
+	else {
+		for (ssize_t i = 0; i < jobsz; ++i) {
+			if (!pending(jobs[i].id))
+				queue(jobs[i].id, jobs[i].project_id, jobs[i].tag);
+		}
 	}
 
-	json_decref(array);
-
-	return;
-
-invalid:
-	log_warn("fetch: invalid JSON input");
-	json_decref(array);
+	json_decref(doc);
 }
 
 static void
 fetchjobs(void)
 {
-	const char *json;
+	json_t *doc;
 
-	if (!(json = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
+	if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
 		log_warn("unable to retrieve jobs");
 	else
-		merge(json);
+		merge(doc);
 }
 
 /*
@@ -514,26 +457,22 @@
 }
 
 static int
-extract(struct task *tk, const char *json)
+extract(struct task *tk, json_t *doc)
 {
-	json_t *doc, *code;
-	json_error_t err;
+	struct project proj;
 	size_t len;
 
-	if (!(doc = json_loads(json, 0, &err))) {
-		log_warn("fetchscript: failed to decode JSON: %s", err.text);
+	if (project_from(&proj, 1, doc) < 0) {
+		json_decref(doc);
+		log_warn("fetchproject: %s", strerror(errno));
 		return -1;
 	}
-	if (!json_is_object(doc) ||
-	    !json_is_string((code = json_object_get(doc, "code"))))
-		goto invalid;
+
+	len = strlen(proj.script);
 
-	len = strlen(json_string_value(code));
-
-	if ((size_t)write(tk->scriptfd, json_string_value(code), len) != len) {
-		log_warn("fetchscript: %s", strerror(errno));
+	if ((size_t)write(tk->scriptfd, proj.script, len) != len) {
 		json_decref(doc);
-
+		log_warn("fetchproject: %s", strerror(errno));
 		return -1;
 	}
 
@@ -542,36 +481,31 @@
 	tk->scriptfd = 0;
 
 	return 0;
-
-invalid:
-	log_warn("fetchscript: invalid JSON");
-	json_decref(doc);
-
-	return -1;
 }
 
 static int
-fetchscript(struct task *tk)
+fetchproject(struct task *tk)
 {
-	const char *json;
+	json_t *doc;
 
-	if (!(json = get("fetchscript", makeurl("api/v1/script/%s", tk->job.project.name))))
+	if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id))))
 		return -1;
 
-	return extract(tk, json);
+	return extract(tk, doc);
 }
 
+/*
+ * Create a task to run the script. This will retrieve the project script code
+ * at this moment and put it in a temporary file.
+ */
 static void
 createtask(struct task *tk)
 {
 	if (tk->status != TASKST_PENDING)
 		return;
 
-	log_debug("creating task (id=%lld, project=%s, tag=%s)",
-	    tk->job.id, tk->job.project.name, tk->job.tag);
-
-	snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%s-XXXXXX",
-	    tk->job.project.name);
+	log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag);
+	snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id);
 
 	if ((tk->scriptfd = mkstemp(tk->script)) < 0 ||
 	    fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) {
@@ -580,7 +514,7 @@
 		return;
 	}
 
-	if (fetchscript(tk) < 0) {
+	if (fetchproject(tk) < 0) {
 		unlink(tk->script);
 		close(tk->scriptfd);
 		tk->scriptfd = 0;
@@ -588,6 +522,9 @@
 		spawn(tk);
 }
 
+/*
+ * Start all pending tasks if the limit of running tasks is not reached.
+ */
 static void
 startall(void)
 {
@@ -598,9 +535,9 @@
 		if (tk->status == TASKST_RUNNING)
 			++nrunning;
 
-	if (nrunning >= (size_t)config.maxbuilds) {
+	if (nrunning >= (size_t)config.maxbuilds)
 		log_debug("not spawning new process because limit is reached");
-	} else {
+	else {
 		tk = TAILQ_FIRST(&tasks);
 
 		while (tk && nrunning++ < (size_t)config.maxbuilds) {
@@ -611,6 +548,74 @@
 }
 
 static void
+fetchworker(void)
+{
+	json_t *doc;
+
+	if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) ||
+	    worker_from(&worker, 1, doc) < 0)
+		errx(1, "unable to retrieve worker id");
+
+	log_info("worker id: %d", worker.id);
+	log_info("worker name: %s", worker.name);
+	log_info("worker description: %s", worker.desc);
+
+	json_decref(doc);
+}
+
+static void
+init(void)
+{
+	struct sigaction sa;
+
+	sa.sa_flags = SA_SIGINFO;
+	sa.sa_sigaction = complete;
+	sigemptyset(&sa.sa_mask);
+
+	if (sigaction(SIGCHLD, &sa, NULL) < 0)
+		err(1, "sigaction");
+
+	log_open("sciworkerd");
+	fetchworker();
+
+#if 0
+	if (pipe(sigpipe) < 0)
+		err(1, "pipe");
+	if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 ||
+	    fcntl(sigpipe[1], F_SETFL, flags | O_NONBLOCK) < 0)
+		err(1, "fcntl");
+#endif
+}
+
+static struct fds
+prepare(void)
+{
+	struct fds fds = {0};
+	struct task *tk;
+	size_t i = 0;
+
+	TAILQ_FOREACH(tk, &tasks, link)
+		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED)
+			fds.listsz++;
+
+	fds.list = util_calloc(fds.listsz, sizeof (*fds.list));
+
+#if 0
+	fds.list[0].fd = sigpipe[0];
+	fds.list[0].events = POLLIN;
+#endif
+
+	TAILQ_FOREACH(tk, &tasks, link) {
+		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
+			fds.list[i].fd = tk->pipe[0];
+			fds.list[i++].events = POLLIN | POLLPRI;
+		}
+	}
+
+	return fds;
+}
+
+static void
 run(void)
 {
 	struct fds fds;