diff sciworkerd/sciworkerd.c @ 19:de4bf839b565

misc: revamp SQL
author David Demelier <markand@malikania.fr>
date Fri, 15 Jul 2022 11:11:48 +0200
parents
children f98ea578b1ef
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sciworkerd/sciworkerd.c	Fri Jul 15 11:11:48 2022 +0200
@@ -0,0 +1,393 @@
+#include <errno.h>
+#include <poll.h>
+#include <signal.h>
+#include <string.h>
+#include <time.h>
+
+#include <utlist.h>
+
+#include "apic.h"
+#include "log.h"
+#include "sciworkerd.h"
+#include "task.h"
+#include "types.h"
+#include "util.h"
+
+#define TAG "sigworkerd: "
+
+struct taskentry {
+	struct task *task;
+	struct job job;
+	struct taskentry *next;
+};
+
+static struct taskentry *taskpending;
+static struct taskentry *tasks;
+static struct taskentry *taskfinished;
+static struct worker worker;
+static int run = 1;
+
+struct sciworkerd sciworkerd = {
+	.fetchinterval = 300,
+	.maxjobs = 4,
+	.timeout = 600
+};
+
+static inline void
+taskentry_free(struct taskentry *entry)
+{
+	if (task_status(entry->task) == TASKSTATUS_RUNNING) {
+		if (task_kill(entry->task) == 0)
+			task_wait(entry->task);
+	}
+
+	task_free(entry->task);
+	free(entry);
+}
+
+static void
+stop(int sign)
+{
+	log_info(TAG "exiting on signal %d\n", sign);
+	run = 0;
+}
+
+static inline int
+pending(int id)
+{
+	const struct taskentry *iter;
+
+	LL_FOREACH(taskpending, iter)
+		if (iter->job.id == id)
+			return 1;
+
+	return 0;
+}
+
+static inline void
+queue(const struct job *job)
+{
+	struct taskentry *tk;
+
+	log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag);
+
+	tk = util_calloc(1, sizeof (*tk));
+	tk->task = task_new(job->tag);
+	memcpy(&tk->job, job, sizeof (*job));
+	LL_APPEND(taskpending, tk);
+}
+
+static void
+merge(json_t *doc)
+{
+	struct job jobs[SCI_JOB_LIST_MAX];
+	ssize_t jobsz;
+	size_t total = 0;
+
+	if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
+		log_warn(TAG "error while parsing jobs: %s", strerror(errno));
+	else {
+		for (ssize_t i = 0; i < jobsz; ++i) {
+			if (!pending(jobs[i].id)) {
+				queue(&jobs[i]);
+				total++;
+			}
+		}
+
+		log_info(TAG "added %zu new pending tasks", total);
+	}
+}
+
+/*
+ * Fetch jobs periodically, depending on the user setting.
+ */
+static void
+fetch_jobs(void)
+{
+	static time_t startup;
+	time_t now;
+	struct apicreq req;
+
+	if (!startup)
+		startup = time(NULL);
+
+	if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
+		startup = now;
+
+		if (apic_get(&req, "%s/api/v1/%s", sciworkerd.url, sciworkerd.name) < 0)
+			log_warn(TAG "unable to fetch jobs: %s", req.error);
+		if (req.doc) {
+			merge(req.doc);
+			json_decref(req.doc);
+		}
+	}
+}
+
+/*
+ * Fetch information about myself.
+ */
+static void
+fetch_worker(void)
+{
+	struct apicreq req;
+
+	if (apic_get(&req, "%s/api/v1/workers/%s", sciworkerd.url, sciworkerd.name) < 0)
+		log_warn(TAG "unable to fetch worker info: %s", req.error);
+	if (!req.doc)
+		log_die(TAG "empty worker response");
+	if (worker_from(&worker, 1, req.doc) < 0)
+		log_die(TAG "unable to parse worker", strerror(errno));
+
+	log_info("worker id: %d", worker.id);
+	log_info("worker name: %s", worker.name);
+	log_info("worker description: %s", worker.desc);
+
+	json_decref(req.doc);
+}
+
+/*
+ * Fetch information about a project.
+ */
+static int
+fetch_project(struct project *project, int id)
+{
+	struct apicreq req;
+
+	if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0)
+		return log_warn(TAG "unable to fetch project info: %s", req.error), -1;
+	if (!req.doc)
+		return log_warn(TAG "empty project response"), -1;
+	if (project_from(project, 1, req.doc) < 0)
+		return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1;
+
+	return 0;
+}
+
+static inline size_t
+count(const struct taskentry *head)
+{
+	const struct taskentry *iter;
+	size_t tot = 0;
+
+	LL_FOREACH(head, iter)
+		tot++;
+
+	return tot;
+}
+
+/*
+ * Start a task. We fetch its script code and then create the task with that
+ * script.
+ */
+static int
+start(struct taskentry *entry)
+{
+	struct project project;
+	pid_t pid;
+
+	if (fetch_project(&project, entry->job.project_id) < 0)
+		return log_warn(TAG "unable to fetch project, dropping task"), -1;
+	if (task_setup(entry->task, project.script) < 0)
+		return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1;
+	if ((pid = task_start(entry->task)) < 0)
+		return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1;
+
+	log_info(TAG "task %lld spawned", (long long int)pid);
+
+	return 0;
+}
+
+static inline void
+delete(struct taskentry *entry)
+{
+	LL_DELETE(taskpending, entry);
+	task_free(entry->task);
+	free(entry);
+}
+
+static void
+start_all(void)
+{
+	size_t running = count(tasks);
+	struct taskentry *entry;
+
+	while (running-- > 0 && (entry = taskpending)) {
+		if (start(entry) < 0)
+			delete(entry);
+		else {
+			LL_DELETE(taskpending, entry);
+			LL_APPEND(tasks, entry);
+		}
+	}
+}
+
+static void
+process_all(void)
+{
+	struct taskentry *iter, *next;
+	struct taskcode code;
+	struct pollfd *fds;
+	size_t fdsz, i = 0;
+	int ret;
+
+	/* First, read every pipes. */
+	if (!(fdsz = count(tasks)))
+		return;
+
+	fds = util_calloc(fdsz, sizeof (*fds));
+
+	for (iter = tasks; iter; iter = iter->next)
+		task_prepare(iter->task, &fds[i++]);
+
+	if (poll(fds, fdsz, 5000) < 0)
+		log_warn("poll: %s", strerror(errno));
+
+	for (iter = tasks, i = 0; i < fdsz; ++i) {
+		next = iter->next;
+
+		/*
+		 *  0: EOF         [wait]
+		 * -1: error       [kill + wait]
+		 * >0: keep going  [nothing]
+		 */
+		if ((ret = task_sync(iter->task, &fds[i])) < 0) {
+			log_warn(TAG "pipe error: %s, killing task", strerror(errno));
+
+			if (task_kill(iter->task) < 0)
+				log_warn(TAG "task kill error: %s", strerror(errno));
+		}
+
+		/* Now wait for the task to complete. */
+		if (ret <= 0) {
+			if (task_wait(iter->task) < 0)
+				log_warn(TAG "task wait error: %s", strerror(errno));
+			else {
+				code = task_code(iter->task);
+
+				switch (task_status(iter->task)) {
+				case TASKSTATUS_EXITED:
+					log_info(TAG "task %lld exited with code %d",
+					    (long long int)task_pid(iter->task), code.exitcode);
+					break;
+				case TASKSTATUS_KILLED:
+					log_info(TAG "task %lld killed with signal %d",
+					    (long long int)task_pid(iter->task), code.sigcode);
+					break;
+				default:
+					break;
+				}
+			}
+
+			/* Remove that task and push to the outgoing queue. */
+			next = iter->next;
+			LL_DELETE(tasks, iter);
+			LL_APPEND(taskfinished, iter);
+		}
+	}
+
+	free(fds);
+}
+
+/*
+ * Kill all tasks that have been running for too long.
+ */
+static void
+ghost_all(void)
+{
+	struct taskentry *iter, *tmp;
+	time_t now;
+
+	LL_FOREACH_SAFE(tasks, iter, tmp) {
+		if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout)
+			continue;
+
+		/* Do not attempt to wait if kill failed to avoid lock. */
+		log_info(TAG "task timeout, killing");
+
+		if (task_kill(iter->task) == 0)
+			task_wait(iter->task);
+
+		LL_DELETE(tasks, iter);
+		LL_APPEND(taskfinished, iter);
+	}
+}
+
+static int
+publish(struct taskentry *iter)
+{
+	// TODO: add sigcode.
+	struct taskcode code = task_code(iter->task);
+	struct jobresult res = {
+		.job_id = iter->job.id,
+		.exitcode = code.exitcode,
+		.log = task_console(iter->task),
+		.worker_id = worker.id
+	};
+	struct apicreq req;
+	json_t *doc;
+	int ret; 
+
+	doc = jobresult_to(&res, 1);
+	ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url);
+	json_decref(doc);
+
+	if (ret)
+		log_warn(TAG "unable to publish task: %s", req.error);
+	else
+		log_info(TAG "task successfully published");
+
+	return ret;
+}
+
+static void
+publish_all(void)
+{
+	struct taskentry *iter, *tmp;
+
+	LL_FOREACH_SAFE(taskfinished, iter, tmp) {
+		if (publish(iter) == 0) {
+			LL_DELETE(taskfinished, iter);
+			taskentry_free(iter);
+		}
+	}
+}
+
+void
+sciworkerd_init(void)
+{
+	struct sigaction sa = {0};
+
+	log_open("sigworkerd");
+
+	sigemptyset(&sa.sa_mask);
+	sa.sa_handler = stop;
+
+	if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
+		log_die(TAG "sigaction: %s", strerror(errno));
+
+	fetch_worker();
+}
+
+void
+sciworkerd_run(void)
+{
+	while (run) {
+		fetch_jobs();
+		process_all();
+		ghost_all();
+		publish_all();
+	}
+}
+
+void
+sciworkerd_finish(void)
+{
+	struct taskentry *iter, *tmp;
+
+	LL_FOREACH_SAFE(taskpending, iter, tmp)
+		taskentry_free(iter);
+	LL_FOREACH_SAFE(tasks, iter, tmp)
+		taskentry_free(iter);
+	LL_FOREACH_SAFE(taskfinished, iter, tmp)
+		taskentry_free(iter);
+}