diff sciworkerd/main.c @ 19:de4bf839b565

misc: revamp SQL
author David Demelier <markand@malikania.fr>
date Fri, 15 Jul 2022 11:11:48 +0200
parents 600204c31bf0
children 2cb228f23f53
line wrap: on
line diff
--- a/sciworkerd/main.c	Tue Jul 12 20:20:51 2022 +0200
+++ b/sciworkerd/main.c	Fri Jul 15 11:11:48 2022 +0200
@@ -16,719 +16,49 @@
  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */
 
-#if 0
-
-#include <sys/queue.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <assert.h>
-#include <err.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <limits.h>
-#include <poll.h>
-#include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
-#include <stdnoreturn.h>
-#include <string.h>
 #include <unistd.h>
 
-#include <curl/curl.h>
-#include <jansson.h>
-
-#include "config.h"
-#include "log.h"
-#include "types.h"
-#include "util.h"
-
-#define TAG_MAX 256
-
-struct task {
-	enum taskst status;
-	pid_t child;
-	int pipe[2];
-	int exitcode;
-	int job_id;
-	int project_id;
-	char job_tag[TAG_MAX];
-	char out[SCI_CONSOLE_MAX];
-	char script[PATH_MAX];
-	int scriptfd;
-	TAILQ_ENTRY(task) link;
-};
-
-TAILQ_HEAD(tasks, task);
-
-struct fds {
-	struct pollfd *list;
-	size_t listsz;
-};
-
-struct fetch {
-	char buf[SCI_MSG_MAX];
-	FILE *bufp;
-};
-
-static struct {
-	char *url;
-	char *worker;
-	int maxbuilds;
-} config = {
-	.url = "http://localhost",
-	.worker = "default",
-	.maxbuilds = 4
-};
-
-static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks);
-static struct worker worker;
-static int alive = 1;
-
-/*
- * Show usage and exit with code 1.
- */
-noreturn static void
-usage(void)
-{
-	fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname());
-	exit(1);
-}
-
-/*
- * Find a task by its id.
- */
-static inline struct task *
-find_by_fd(int fd)
-{
-	struct task *tk;
-
-	TAILQ_FOREACH(tk, &tasks, link)
-		if (tk->pipe[0] == fd)
-			return tk;
-
-	return NULL;
-}
-
-/*
- * Find a task by its pid number.
- */
-static inline struct task *
-find_by_pid(pid_t pid)
-{
-	struct task *t;
-
-	TAILQ_FOREACH(t, &tasks, link)
-		if (t->child == pid)
-			return t;
-
-	return NULL;
-}
-
-/*
- * Destroy a task entirely.
- */
-static void
-destroy(struct task *tk)
-{
-	log_debug("destroying task %d", tk->job_id);
-	unlink(tk->script);
-
-	if (tk->pipe[0])
-		close(tk->pipe[0]);
-	if (tk->pipe[1])
-		close(tk->pipe[1]);
-	if (tk->scriptfd)
-		close(tk->scriptfd);
-
-	TAILQ_REMOVE(&tasks, tk, link);
-	memset(tk, 0, sizeof (*tk));
-	free(tk);
-}
-
-static const char *
-makeurl(const char *fmt, ...)
-{
-	assert(fmt);
-
-	static char url[256];
-	char page[128] = {0};
-	va_list ap;
-
-	va_start(ap, fmt);
-	vsnprintf(page, sizeof (page), fmt, ap);
-	va_end(ap);
-
-	snprintf(url, sizeof (url), "%s/%s", config.url, page);
-
-	return url;
-}
-
-static void
-complete(int signum, siginfo_t *sinfo, void *ctx)
-{
-	(void)ctx;
-	(void)signum;
-
-	struct task *tk;
-
-	if (waitpid(sinfo->si_pid, NULL, 0) < 0)
-		log_warn("waitpid: %s", strerror(errno));
-
-	if ((tk = find_by_pid(sinfo->si_pid))) {
-		log_debug("process %d terminated (exitcode=%d)",
-		    (int)sinfo->si_pid, sinfo->si_status);
-
-		close(tk->pipe[1]);
-		tk->status = TASKST_COMPLETED;
-		tk->exitcode = sinfo->si_status;
-		tk->pipe[1] = 0;
-	}
-}
-
-static void
-stop(int signum)
-{
-	log_warn("exiting on signal %d", signum);
-	alive = 0;
-}
-
-static char *
-uploadenc(const struct task *tk)
-{
-	json_t *doc;
-
-	struct jobresult res = {0};
-	char *dump;
-
-	res.job_id = tk->job_id;
-	res.exitcode = tk->exitcode;
-	res.log = tk->out;
-	res.worker_id = worker.id;
-
-	doc = jobresult_to(&res, 1);
-	dump = json_dumps(doc, JSON_COMPACT);
-
-	json_decref(doc);
-
-	return dump;
-}
-
-static size_t
-getcb(char *in, size_t n, size_t w, FILE *fp)
-{
-	if (fwrite(in, n, w, fp) != w)
-		return log_warn("get: %s", strerror(errno)), 0;
-
-	return w;
-}
-
-static json_t *
-get(const char *topic, const char *url)
-{
-	CURL *curl;
-	CURLcode code;
-
-	json_t *doc;
-	json_error_t error;
-
-	char buf[SCI_MSG_MAX];
-	long status;
-	FILE *fp;
-
-	curl = curl_easy_init();
-
-	if (!(fp = fmemopen(buf, sizeof (buf), "w")))
-		err(1, "fmemopen");
-
-	curl_easy_setopt(curl, CURLOPT_URL, url);
-	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
-	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
-	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb);
-	curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp);
-
-	if ((code = curl_easy_perform(curl)) != CURLE_OK)
-		log_warn("%s: %s", topic, curl_easy_strerror(code));
-
-	curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
-	curl_easy_cleanup(curl);
-
-	fclose(fp);
-
-	if (code != CURLE_OK)
-		return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL;
-	if (status != 200)
-		return log_warn("%s: unexpected status code %ld", topic, status), NULL;
-	if (!(doc = json_loads(buf, 0, &error)))
-		return log_warn("%s: %s", topic, error.text), NULL;
-
-	return doc;
-}
-
-static size_t
-silent(char *in, size_t n, size_t w, void *data)
-{
-	(void)in;
-	(void)n;
-	(void)data;
-
-	return w;
-}
+#include "sciworkerd.h"
 
 static void
-upload(struct task *tk)
-{
-	CURL *curl;
-	CURLcode code;
-	struct curl_slist *headers = NULL;
-	long status;
-	char *dump;
-
-	curl = curl_easy_init();
-	headers = curl_slist_append(headers, "Content-Type: application/json");
-	curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs"));
-	//curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000");
-	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
-	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
-	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
-	curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk)));
-	curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump));
-	curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
-	code = curl_easy_perform(curl);
-	curl_slist_free_all(headers);
-
-	/*
-	 * If we fail to upload data, we put the result into syncing mode so
-	 * that we retry later without redoing the job over and over
-	 */
-	tk->status = TASKST_SYNCING;
-
-	if (code != CURLE_OK)
-		log_warn("upload: %s", curl_easy_strerror(code));
-	else {
-		curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
-
-		if (status != 200)
-			log_warn("upload: unexpected return code: %ld", status);
-		else
-			destroy(tk);
-	}
-
-	free(dump);
-	curl_easy_cleanup(curl);
-}
-
-static inline int
-pending(int id)
-{
-	struct task *t;
-
-	TAILQ_FOREACH(t, &tasks, link)
-		if (t->job_id == id)
-			return 1;
-
-	return 0;
-}
-
-static void
-queue(int id, int project_id, const char *tag)
-{
-	struct task *tk;
-
-	log_info("queued job build (%d) for tag %s\n", id, tag);
-
-	tk = util_calloc(1, sizeof (*tk));
-	tk->job_id = id;
-	tk->project_id = project_id;
-	strlcpy(tk->job_tag, tag, sizeof (tk->job_tag));
-
-	TAILQ_INSERT_TAIL(&tasks, tk, link);
-}
-
-static void
-merge(json_t *doc)
+env(void)
 {
-	struct job jobs[SCI_JOB_LIST_MAX];
-	ssize_t jobsz;
-
-	if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
-		log_warn("fetchjobs: %s", strerror(errno));
-	else {
-		for (ssize_t i = 0; i < jobsz; ++i) {
-			if (!pending(jobs[i].id))
-				queue(jobs[i].id, jobs[i].project_id, jobs[i].tag);
-		}
-	}
-
-	json_decref(doc);
-}
-
-static void
-fetchjobs(void)
-{
-	json_t *doc;
-
-	if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
-		log_warn("unable to retrieve jobs");
-	else
-		merge(doc);
-}
-
-/*
- * This function reads stdout/stderr pipe from child and optionally remove them
- * if they have completed.
- */
-static void
-readall(struct fds *fds)
-{
-	struct task *tk;
-	char buf[BUFSIZ];
-	ssize_t nr;
-
-	for (size_t i = 0; i < fds->listsz; ++i) {
-		if (fds->list[i].revents == 0)
-			continue;
-		if (!(tk = find_by_fd(fds->list[i].fd)))
-			continue;
-
-		/* Read stdout/stderr from children pipe. */
-		if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0)
-			tk->status = TASKST_SYNCING;
-		else {
-			buf[nr] = 0;
-			strlcat(tk->out, buf, sizeof (tk->out));
-		}
-	}
-}
-
-/*
- * Retrieve status code from spawned process complete or upload again if they
- * failed to sync.
- */
-static void
-flushall(void)
-{
-	struct task *tk, *tmp;
-
-	TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp)
-		if (tk->status == TASKST_SYNCING)
-			upload(tk);
-}
-
-static int
-extract(struct task *tk, json_t *doc)
-{
-	struct project proj;
-	size_t len;
-
-	if (project_from(&proj, 1, doc) < 0) {
-		json_decref(doc);
-		log_warn("fetchproject: %s", strerror(errno));
-		return -1;
-	}
-
-	len = strlen(proj.script);
-
-	if ((size_t)write(tk->scriptfd, proj.script, len) != len) {
-		json_decref(doc);
-		log_warn("fetchproject: %s", strerror(errno));
-		return -1;
-	}
-
-	/* Close so we can finally spawn it. */
-	close(tk->scriptfd);
-	tk->scriptfd = 0;
+	const char *env;
 
-	return 0;
-}
-
-static int
-fetchproject(struct task *tk)
-{
-	json_t *doc;
-
-	if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id))))
-		return -1;
-
-	return extract(tk, doc);
-}
-
-/*
- * Create a task to run the script. This will retrieve the project script code
- * at this moment and put it in a temporary file.
- */
-static void
-createtask(struct task *tk)
-{
-	if (tk->status != TASKST_PENDING)
-		return;
-
-	log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag);
-	snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id);
-
-	if ((tk->scriptfd = mkstemp(tk->script)) < 0 ||
-	    fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) {
-		unlink(tk->script);
-		log_warn("%s", strerror(errno));
-		return;
-	}
-
-	if (fetchproject(tk) < 0) {
-		unlink(tk->script);
-		close(tk->scriptfd);
-		tk->scriptfd = 0;
-	} else
-		spawn(tk);
-}
-
-/*
- * Start all pending tasks if the limit of running tasks is not reached.
- */
-static void
-startall(void)
-{
-	size_t nrunning = 0;
-	struct task *tk;
-
-	TAILQ_FOREACH(tk, &tasks, link)
-		if (tk->status == TASKST_RUNNING)
-			++nrunning;
-
-	if (nrunning >= (size_t)config.maxbuilds)
-		log_debug("not spawning new process because limit is reached");
-	else {
-		tk = TAILQ_FIRST(&tasks);
-
-		while (tk && nrunning++ < (size_t)config.maxbuilds) {
-			createtask(tk);
-			tk = TAILQ_NEXT(tk, link);
-		}
-	}
-}
-
-static void
-fetchworker(void)
-{
-	json_t *doc;
-
-	if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) ||
-	    worker_from(&worker, 1, doc) < 0)
-		errx(1, "unable to retrieve worker id");
-
-	log_info("worker id: %d", worker.id);
-	log_info("worker name: %s", worker.name);
-	log_info("worker description: %s", worker.desc);
-
-	json_decref(doc);
-}
-
-static void
-init(void)
-{
-	struct sigaction sa;
-
-	sa.sa_flags = SA_SIGINFO | SA_RESTART;
-	sa.sa_sigaction = complete;
-	sigemptyset(&sa.sa_mask);
-
-	if (sigaction(SIGCHLD, &sa, NULL) < 0)
-		err(1, "sigaction");
-
-	sa.sa_flags = SA_RESTART;
-	sa.sa_handler = stop;
-	sigemptyset(&sa.sa_mask);
-
-	if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0)
-		err(1, "sigaction");
-
-	log_open("sciworkerd");
-	fetchworker();
-}
-
-static struct fds
-prepare(void)
-{
-	struct fds fds = {0};
-	struct task *tk;
-	size_t i = 0;
-
-	TAILQ_FOREACH(tk, &tasks, link)
-		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED)
-			fds.listsz++;
-
-	fds.list = util_calloc(fds.listsz, sizeof (*fds.list));
-
-	TAILQ_FOREACH(tk, &tasks, link) {
-		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
-			fds.list[i].fd = tk->pipe[0];
-			fds.list[i++].events = POLLIN | POLLPRI;
-		}
-	}
-
-	return fds;
-}
-
-static void
-run(void)
-{
-	struct fds fds;
-
-	fds = prepare();
-
-	if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR)
-		err(1, "poll");
-
-	fetchjobs();
-	readall(&fds);
-	startall();
-	flushall();
-}
-
-static void
-finish(void)
-{
-	size_t tot = 0;
-	struct task *tk, *tmp;
-
-	TAILQ_FOREACH(tk, &tasks, link)
-		tot++;
-
-	signal(SIGCHLD, SIG_IGN);
-	log_debug("killing remaining %zu tasks", tot);
-
-	TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) {
-		if (tk->status == TASKST_RUNNING) {
-			kill(tk->child, SIGTERM);
-			waitpid(tk->child, NULL, 0);
-		}
-
-		destroy(tk);
-	}
+	if ((env = getenv("SCI_URL")))
+		snprintf(sciworkerd.url, sizeof (sciworkerd.url), "%s", optarg);
+	if ((env = getenv("SCI_WORKER")))
+		snprintf(sciworkerd.name, sizeof (sciworkerd.name), "%s", optarg);
 }
 
 int
 main(int argc, char **argv)
 {
-	int ch;
-	const char *errstr;
+	int ch, val;
 
-	setprogname("sciworkerd");
+	env();
+	opterr = 0;
 
-	while ((ch = getopt(argc, argv, "m:u:w:")) != -1) {
+	while ((ch = getopt(argc, argv, "j:t:u:w:")) != -1) {
 		switch (ch) {
-		case 'm':
-			config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr);
-
-			if (errstr)
-				errx(1, "%s: %s", optarg, errstr);
-
+		case 'j':
+			if ((val = atoi(optarg)) > 0)
+				sciworkerd.maxjobs = val;
+			break;
+		case 't':
+			if ((val = atoi(optarg)) > 0)
+				sciworkerd.timeout = val;
 			break;
 		case 'u':
-			config.url = optarg;
+			snprintf(sciworkerd.url, sizeof (sciworkerd.url), "%s", optarg);
 			break;
 		case 'w':
-			config.worker = optarg;
+			snprintf(sciworkerd.name, sizeof (sciworkerd.name), "%s", optarg);
 			break;
 		default:
-			usage();
 			break;
 		}
 	}
-
-	init();
-
-	while (alive)
-		run();
-
-	finish();
 }
-#endif
-
-
-
-
-
-
-
-
-
-
-#include <err.h>
-#include <errno.h>
-#include <poll.h>
-#include <signal.h>
-#include <string.h>
-#include <time.h>
-#include <unistd.h>
-
-#include "types.h"
-#include "task.h"
-
-#define SCRIPT \
-	"#!/bin/sh\n" \
-	"echo yes\n" \
-	"sleep 10\n" \
-	"echo no 1>&2\n" \
-	"sleep 1\n" \
-	"exit 1"
-
-int
-main(void)
-{
-	struct job job = {
-		.project_id = 10,
-		.id = 10,
-		.tag = "1234"
-	};
-	struct sigaction sa = {0};
-	struct pollfd fd;
-	struct task *t;
-	int run = 1;
-
-	t = task_new(&job);
-
-	if (task_setup(t, SCRIPT) < 0)
-		err(1, "task_set_script");
-	if (task_start(t) < 0)
-		err(1, "task_start");
-
-	while (run) {
-		if (difftime(time(NULL), task_uptime(t)) >= 3) {
-			printf("task timeout !\n");
-			task_kill(t);
-			task_wait(t);
-			break;
-		}
-
-		task_prepare(t, &fd);
-
-		if (poll(&fd, 1, 250) < 0 && errno != EINTR)
-			err(1, "poll");
-
-		switch (task_sync(t, &fd)) {
-		case -1:
-			err(1, "task_sync");
-		case 0:
-			run = 0;
-			task_wait(t);
-			break;
-		default:
-			/* Keep going... */
-			break;
-		}
-	}
-
-	switch (task_status(t)) {
-	case TASKSTATUS_EXITED:
-		printf("process exited with code: %d\n", task_code(t).exitcode);
-		break;
-	case TASKSTATUS_KILLED:
-		printf("process killed with signal %d\n", task_code(t).sigcode);
-		break;
-	default:
-		break;
-	}
-
-	printf("== console ==\n%s==\n", task_console(t));
-	task_free(t);
-}