diff sciworkerd/main.c @ 18:600204c31bf0

misc: refactor
author David Demelier <markand@malikania.fr>
date Tue, 12 Jul 2022 20:20:51 +0200
parents sciworkerd.c@40fe70256fb0
children de4bf839b565
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sciworkerd/main.c	Tue Jul 12 20:20:51 2022 +0200
@@ -0,0 +1,734 @@
+/*
+ * sciworkerd.c -- main sciworkerd(8) program file
+ *
+ * Copyright (c) 2021 David Demelier <markand@malikania.fr>
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#if 0
+
+#include <sys/queue.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <assert.h>
+#include <err.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdnoreturn.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <curl/curl.h>
+#include <jansson.h>
+
+#include "config.h"
+#include "log.h"
+#include "types.h"
+#include "util.h"
+
+#define TAG_MAX 256
+
+struct task {
+	enum taskst status;
+	pid_t child;
+	int pipe[2];
+	int exitcode;
+	int job_id;
+	int project_id;
+	char job_tag[TAG_MAX];
+	char out[SCI_CONSOLE_MAX];
+	char script[PATH_MAX];
+	int scriptfd;
+	TAILQ_ENTRY(task) link;
+};
+
+TAILQ_HEAD(tasks, task);
+
+struct fds {
+	struct pollfd *list;
+	size_t listsz;
+};
+
+struct fetch {
+	char buf[SCI_MSG_MAX];
+	FILE *bufp;
+};
+
+static struct {
+	char *url;
+	char *worker;
+	int maxbuilds;
+} config = {
+	.url = "http://localhost",
+	.worker = "default",
+	.maxbuilds = 4
+};
+
+static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks);
+static struct worker worker;
+static int alive = 1;
+
+/*
+ * Show usage and exit with code 1.
+ */
+noreturn static void
+usage(void)
+{
+	fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname());
+	exit(1);
+}
+
+/*
+ * Find a task by its id.
+ */
+static inline struct task *
+find_by_fd(int fd)
+{
+	struct task *tk;
+
+	TAILQ_FOREACH(tk, &tasks, link)
+		if (tk->pipe[0] == fd)
+			return tk;
+
+	return NULL;
+}
+
+/*
+ * Find a task by its pid number.
+ */
+static inline struct task *
+find_by_pid(pid_t pid)
+{
+	struct task *t;
+
+	TAILQ_FOREACH(t, &tasks, link)
+		if (t->child == pid)
+			return t;
+
+	return NULL;
+}
+
+/*
+ * Destroy a task entirely.
+ */
+static void
+destroy(struct task *tk)
+{
+	log_debug("destroying task %d", tk->job_id);
+	unlink(tk->script);
+
+	if (tk->pipe[0])
+		close(tk->pipe[0]);
+	if (tk->pipe[1])
+		close(tk->pipe[1]);
+	if (tk->scriptfd)
+		close(tk->scriptfd);
+
+	TAILQ_REMOVE(&tasks, tk, link);
+	memset(tk, 0, sizeof (*tk));
+	free(tk);
+}
+
+static const char *
+makeurl(const char *fmt, ...)
+{
+	assert(fmt);
+
+	static char url[256];
+	char page[128] = {0};
+	va_list ap;
+
+	va_start(ap, fmt);
+	vsnprintf(page, sizeof (page), fmt, ap);
+	va_end(ap);
+
+	snprintf(url, sizeof (url), "%s/%s", config.url, page);
+
+	return url;
+}
+
+static void
+complete(int signum, siginfo_t *sinfo, void *ctx)
+{
+	(void)ctx;
+	(void)signum;
+
+	struct task *tk;
+
+	if (waitpid(sinfo->si_pid, NULL, 0) < 0)
+		log_warn("waitpid: %s", strerror(errno));
+
+	if ((tk = find_by_pid(sinfo->si_pid))) {
+		log_debug("process %d terminated (exitcode=%d)",
+		    (int)sinfo->si_pid, sinfo->si_status);
+
+		close(tk->pipe[1]);
+		tk->status = TASKST_COMPLETED;
+		tk->exitcode = sinfo->si_status;
+		tk->pipe[1] = 0;
+	}
+}
+
+static void
+stop(int signum)
+{
+	log_warn("exiting on signal %d", signum);
+	alive = 0;
+}
+
+static char *
+uploadenc(const struct task *tk)
+{
+	json_t *doc;
+
+	struct jobresult res = {0};
+	char *dump;
+
+	res.job_id = tk->job_id;
+	res.exitcode = tk->exitcode;
+	res.log = tk->out;
+	res.worker_id = worker.id;
+
+	doc = jobresult_to(&res, 1);
+	dump = json_dumps(doc, JSON_COMPACT);
+
+	json_decref(doc);
+
+	return dump;
+}
+
+static size_t
+getcb(char *in, size_t n, size_t w, FILE *fp)
+{
+	if (fwrite(in, n, w, fp) != w)
+		return log_warn("get: %s", strerror(errno)), 0;
+
+	return w;
+}
+
+static json_t *
+get(const char *topic, const char *url)
+{
+	CURL *curl;
+	CURLcode code;
+
+	json_t *doc;
+	json_error_t error;
+
+	char buf[SCI_MSG_MAX];
+	long status;
+	FILE *fp;
+
+	curl = curl_easy_init();
+
+	if (!(fp = fmemopen(buf, sizeof (buf), "w")))
+		err(1, "fmemopen");
+
+	curl_easy_setopt(curl, CURLOPT_URL, url);
+	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
+	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
+	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb);
+	curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp);
+
+	if ((code = curl_easy_perform(curl)) != CURLE_OK)
+		log_warn("%s: %s", topic, curl_easy_strerror(code));
+
+	curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
+	curl_easy_cleanup(curl);
+
+	fclose(fp);
+
+	if (code != CURLE_OK)
+		return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL;
+	if (status != 200)
+		return log_warn("%s: unexpected status code %ld", topic, status), NULL;
+	if (!(doc = json_loads(buf, 0, &error)))
+		return log_warn("%s: %s", topic, error.text), NULL;
+
+	return doc;
+}
+
+static size_t
+silent(char *in, size_t n, size_t w, void *data)
+{
+	(void)in;
+	(void)n;
+	(void)data;
+
+	return w;
+}
+
+static void
+upload(struct task *tk)
+{
+	CURL *curl;
+	CURLcode code;
+	struct curl_slist *headers = NULL;
+	long status;
+	char *dump;
+
+	curl = curl_easy_init();
+	headers = curl_slist_append(headers, "Content-Type: application/json");
+	curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs"));
+	//curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000");
+	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
+	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
+	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
+	curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk)));
+	curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump));
+	curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
+	code = curl_easy_perform(curl);
+	curl_slist_free_all(headers);
+
+	/*
+	 * If we fail to upload data, we put the result into syncing mode so
+	 * that we retry later without redoing the job over and over
+	 */
+	tk->status = TASKST_SYNCING;
+
+	if (code != CURLE_OK)
+		log_warn("upload: %s", curl_easy_strerror(code));
+	else {
+		curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
+
+		if (status != 200)
+			log_warn("upload: unexpected return code: %ld", status);
+		else
+			destroy(tk);
+	}
+
+	free(dump);
+	curl_easy_cleanup(curl);
+}
+
+static inline int
+pending(int id)
+{
+	struct task *t;
+
+	TAILQ_FOREACH(t, &tasks, link)
+		if (t->job_id == id)
+			return 1;
+
+	return 0;
+}
+
+static void
+queue(int id, int project_id, const char *tag)
+{
+	struct task *tk;
+
+	log_info("queued job build (%d) for tag %s\n", id, tag);
+
+	tk = util_calloc(1, sizeof (*tk));
+	tk->job_id = id;
+	tk->project_id = project_id;
+	strlcpy(tk->job_tag, tag, sizeof (tk->job_tag));
+
+	TAILQ_INSERT_TAIL(&tasks, tk, link);
+}
+
+static void
+merge(json_t *doc)
+{
+	struct job jobs[SCI_JOB_LIST_MAX];
+	ssize_t jobsz;
+
+	if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
+		log_warn("fetchjobs: %s", strerror(errno));
+	else {
+		for (ssize_t i = 0; i < jobsz; ++i) {
+			if (!pending(jobs[i].id))
+				queue(jobs[i].id, jobs[i].project_id, jobs[i].tag);
+		}
+	}
+
+	json_decref(doc);
+}
+
+static void
+fetchjobs(void)
+{
+	json_t *doc;
+
+	if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
+		log_warn("unable to retrieve jobs");
+	else
+		merge(doc);
+}
+
+/*
+ * This function reads stdout/stderr pipe from child and optionally remove them
+ * if they have completed.
+ */
+static void
+readall(struct fds *fds)
+{
+	struct task *tk;
+	char buf[BUFSIZ];
+	ssize_t nr;
+
+	for (size_t i = 0; i < fds->listsz; ++i) {
+		if (fds->list[i].revents == 0)
+			continue;
+		if (!(tk = find_by_fd(fds->list[i].fd)))
+			continue;
+
+		/* Read stdout/stderr from children pipe. */
+		if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0)
+			tk->status = TASKST_SYNCING;
+		else {
+			buf[nr] = 0;
+			strlcat(tk->out, buf, sizeof (tk->out));
+		}
+	}
+}
+
+/*
+ * Retrieve status code from spawned process complete or upload again if they
+ * failed to sync.
+ */
+static void
+flushall(void)
+{
+	struct task *tk, *tmp;
+
+	TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp)
+		if (tk->status == TASKST_SYNCING)
+			upload(tk);
+}
+
+static int
+extract(struct task *tk, json_t *doc)
+{
+	struct project proj;
+	size_t len;
+
+	if (project_from(&proj, 1, doc) < 0) {
+		json_decref(doc);
+		log_warn("fetchproject: %s", strerror(errno));
+		return -1;
+	}
+
+	len = strlen(proj.script);
+
+	if ((size_t)write(tk->scriptfd, proj.script, len) != len) {
+		json_decref(doc);
+		log_warn("fetchproject: %s", strerror(errno));
+		return -1;
+	}
+
+	/* Close so we can finally spawn it. */
+	close(tk->scriptfd);
+	tk->scriptfd = 0;
+
+	return 0;
+}
+
+static int
+fetchproject(struct task *tk)
+{
+	json_t *doc;
+
+	if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id))))
+		return -1;
+
+	return extract(tk, doc);
+}
+
+/*
+ * Create a task to run the script. This will retrieve the project script code
+ * at this moment and put it in a temporary file.
+ */
+static void
+createtask(struct task *tk)
+{
+	if (tk->status != TASKST_PENDING)
+		return;
+
+	log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag);
+	snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id);
+
+	if ((tk->scriptfd = mkstemp(tk->script)) < 0 ||
+	    fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) {
+		unlink(tk->script);
+		log_warn("%s", strerror(errno));
+		return;
+	}
+
+	if (fetchproject(tk) < 0) {
+		unlink(tk->script);
+		close(tk->scriptfd);
+		tk->scriptfd = 0;
+	} else
+		spawn(tk);
+}
+
+/*
+ * Start all pending tasks if the limit of running tasks is not reached.
+ */
+static void
+startall(void)
+{
+	size_t nrunning = 0;
+	struct task *tk;
+
+	TAILQ_FOREACH(tk, &tasks, link)
+		if (tk->status == TASKST_RUNNING)
+			++nrunning;
+
+	if (nrunning >= (size_t)config.maxbuilds)
+		log_debug("not spawning new process because limit is reached");
+	else {
+		tk = TAILQ_FIRST(&tasks);
+
+		while (tk && nrunning++ < (size_t)config.maxbuilds) {
+			createtask(tk);
+			tk = TAILQ_NEXT(tk, link);
+		}
+	}
+}
+
+static void
+fetchworker(void)
+{
+	json_t *doc;
+
+	if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) ||
+	    worker_from(&worker, 1, doc) < 0)
+		errx(1, "unable to retrieve worker id");
+
+	log_info("worker id: %d", worker.id);
+	log_info("worker name: %s", worker.name);
+	log_info("worker description: %s", worker.desc);
+
+	json_decref(doc);
+}
+
+static void
+init(void)
+{
+	struct sigaction sa;
+
+	sa.sa_flags = SA_SIGINFO | SA_RESTART;
+	sa.sa_sigaction = complete;
+	sigemptyset(&sa.sa_mask);
+
+	if (sigaction(SIGCHLD, &sa, NULL) < 0)
+		err(1, "sigaction");
+
+	sa.sa_flags = SA_RESTART;
+	sa.sa_handler = stop;
+	sigemptyset(&sa.sa_mask);
+
+	if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0)
+		err(1, "sigaction");
+
+	log_open("sciworkerd");
+	fetchworker();
+}
+
+static struct fds
+prepare(void)
+{
+	struct fds fds = {0};
+	struct task *tk;
+	size_t i = 0;
+
+	TAILQ_FOREACH(tk, &tasks, link)
+		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED)
+			fds.listsz++;
+
+	fds.list = util_calloc(fds.listsz, sizeof (*fds.list));
+
+	TAILQ_FOREACH(tk, &tasks, link) {
+		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
+			fds.list[i].fd = tk->pipe[0];
+			fds.list[i++].events = POLLIN | POLLPRI;
+		}
+	}
+
+	return fds;
+}
+
+static void
+run(void)
+{
+	struct fds fds;
+
+	fds = prepare();
+
+	if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR)
+		err(1, "poll");
+
+	fetchjobs();
+	readall(&fds);
+	startall();
+	flushall();
+}
+
+static void
+finish(void)
+{
+	size_t tot = 0;
+	struct task *tk, *tmp;
+
+	TAILQ_FOREACH(tk, &tasks, link)
+		tot++;
+
+	signal(SIGCHLD, SIG_IGN);
+	log_debug("killing remaining %zu tasks", tot);
+
+	TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) {
+		if (tk->status == TASKST_RUNNING) {
+			kill(tk->child, SIGTERM);
+			waitpid(tk->child, NULL, 0);
+		}
+
+		destroy(tk);
+	}
+}
+
+int
+main(int argc, char **argv)
+{
+	int ch;
+	const char *errstr;
+
+	setprogname("sciworkerd");
+
+	while ((ch = getopt(argc, argv, "m:u:w:")) != -1) {
+		switch (ch) {
+		case 'm':
+			config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr);
+
+			if (errstr)
+				errx(1, "%s: %s", optarg, errstr);
+
+			break;
+		case 'u':
+			config.url = optarg;
+			break;
+		case 'w':
+			config.worker = optarg;
+			break;
+		default:
+			usage();
+			break;
+		}
+	}
+
+	init();
+
+	while (alive)
+		run();
+
+	finish();
+}
+#endif
+
+
+
+
+
+
+
+
+
+
+#include <err.h>
+#include <errno.h>
+#include <poll.h>
+#include <signal.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+
+#include "types.h"
+#include "task.h"
+
+#define SCRIPT \
+	"#!/bin/sh\n" \
+	"echo yes\n" \
+	"sleep 10\n" \
+	"echo no 1>&2\n" \
+	"sleep 1\n" \
+	"exit 1"
+
+int
+main(void)
+{
+	struct job job = {
+		.project_id = 10,
+		.id = 10,
+		.tag = "1234"
+	};
+	struct sigaction sa = {0};
+	struct pollfd fd;
+	struct task *t;
+	int run = 1;
+
+	t = task_new(&job);
+
+	if (task_setup(t, SCRIPT) < 0)
+		err(1, "task_set_script");
+	if (task_start(t) < 0)
+		err(1, "task_start");
+
+	while (run) {
+		if (difftime(time(NULL), task_uptime(t)) >= 3) {
+			printf("task timeout !\n");
+			task_kill(t);
+			task_wait(t);
+			break;
+		}
+
+		task_prepare(t, &fd);
+
+		if (poll(&fd, 1, 250) < 0 && errno != EINTR)
+			err(1, "poll");
+
+		switch (task_sync(t, &fd)) {
+		case -1:
+			err(1, "task_sync");
+		case 0:
+			run = 0;
+			task_wait(t);
+			break;
+		default:
+			/* Keep going... */
+			break;
+		}
+	}
+
+	switch (task_status(t)) {
+	case TASKSTATUS_EXITED:
+		printf("process exited with code: %d\n", task_code(t).exitcode);
+		break;
+	case TASKSTATUS_KILLED:
+		printf("process killed with signal %d\n", task_code(t).sigcode);
+		break;
+	default:
+		break;
+	}
+
+	printf("== console ==\n%s==\n", task_console(t));
+	task_free(t);
+}