view sciworkerd/sciworkerd.c @ 19:de4bf839b565

misc: revamp SQL
author David Demelier <markand@malikania.fr>
date Fri, 15 Jul 2022 11:11:48 +0200
parents
children f98ea578b1ef
line wrap: on
line source

#include <errno.h>
#include <poll.h>
#include <signal.h>
#include <string.h>
#include <time.h>

#include <utlist.h>

#include "apic.h"
#include "log.h"
#include "sciworkerd.h"
#include "task.h"
#include "types.h"
#include "util.h"

#define TAG "sigworkerd: "

struct taskentry {
	struct task *task;
	struct job job;
	struct taskentry *next;
};

static struct taskentry *taskpending;
static struct taskentry *tasks;
static struct taskentry *taskfinished;
static struct worker worker;
static int run = 1;

struct sciworkerd sciworkerd = {
	.fetchinterval = 300,
	.maxjobs = 4,
	.timeout = 600
};

static inline void
taskentry_free(struct taskentry *entry)
{
	if (task_status(entry->task) == TASKSTATUS_RUNNING) {
		if (task_kill(entry->task) == 0)
			task_wait(entry->task);
	}

	task_free(entry->task);
	free(entry);
}

static void
stop(int sign)
{
	log_info(TAG "exiting on signal %d\n", sign);
	run = 0;
}

static inline int
pending(int id)
{
	const struct taskentry *iter;

	LL_FOREACH(taskpending, iter)
		if (iter->job.id == id)
			return 1;

	return 0;
}

static inline void
queue(const struct job *job)
{
	struct taskentry *tk;

	log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag);

	tk = util_calloc(1, sizeof (*tk));
	tk->task = task_new(job->tag);
	memcpy(&tk->job, job, sizeof (*job));
	LL_APPEND(taskpending, tk);
}

static void
merge(json_t *doc)
{
	struct job jobs[SCI_JOB_LIST_MAX];
	ssize_t jobsz;
	size_t total = 0;

	if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
		log_warn(TAG "error while parsing jobs: %s", strerror(errno));
	else {
		for (ssize_t i = 0; i < jobsz; ++i) {
			if (!pending(jobs[i].id)) {
				queue(&jobs[i]);
				total++;
			}
		}

		log_info(TAG "added %zu new pending tasks", total);
	}
}

/*
 * Fetch jobs periodically, depending on the user setting.
 */
static void
fetch_jobs(void)
{
	static time_t startup;
	time_t now;
	struct apicreq req;

	if (!startup)
		startup = time(NULL);

	if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
		startup = now;

		if (apic_get(&req, "%s/api/v1/%s", sciworkerd.url, sciworkerd.name) < 0)
			log_warn(TAG "unable to fetch jobs: %s", req.error);
		if (req.doc) {
			merge(req.doc);
			json_decref(req.doc);
		}
	}
}

/*
 * Fetch information about myself.
 */
static void
fetch_worker(void)
{
	struct apicreq req;

	if (apic_get(&req, "%s/api/v1/workers/%s", sciworkerd.url, sciworkerd.name) < 0)
		log_warn(TAG "unable to fetch worker info: %s", req.error);
	if (!req.doc)
		log_die(TAG "empty worker response");
	if (worker_from(&worker, 1, req.doc) < 0)
		log_die(TAG "unable to parse worker", strerror(errno));

	log_info("worker id: %d", worker.id);
	log_info("worker name: %s", worker.name);
	log_info("worker description: %s", worker.desc);

	json_decref(req.doc);
}

/*
 * Fetch information about a project.
 */
static int
fetch_project(struct project *project, int id)
{
	struct apicreq req;

	if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0)
		return log_warn(TAG "unable to fetch project info: %s", req.error), -1;
	if (!req.doc)
		return log_warn(TAG "empty project response"), -1;
	if (project_from(project, 1, req.doc) < 0)
		return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1;

	return 0;
}

static inline size_t
count(const struct taskentry *head)
{
	const struct taskentry *iter;
	size_t tot = 0;

	LL_FOREACH(head, iter)
		tot++;

	return tot;
}

/*
 * Start a task. We fetch its script code and then create the task with that
 * script.
 */
static int
start(struct taskentry *entry)
{
	struct project project;
	pid_t pid;

	if (fetch_project(&project, entry->job.project_id) < 0)
		return log_warn(TAG "unable to fetch project, dropping task"), -1;
	if (task_setup(entry->task, project.script) < 0)
		return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1;
	if ((pid = task_start(entry->task)) < 0)
		return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1;

	log_info(TAG "task %lld spawned", (long long int)pid);

	return 0;
}

static inline void
delete(struct taskentry *entry)
{
	LL_DELETE(taskpending, entry);
	task_free(entry->task);
	free(entry);
}

static void
start_all(void)
{
	size_t running = count(tasks);
	struct taskentry *entry;

	while (running-- > 0 && (entry = taskpending)) {
		if (start(entry) < 0)
			delete(entry);
		else {
			LL_DELETE(taskpending, entry);
			LL_APPEND(tasks, entry);
		}
	}
}

static void
process_all(void)
{
	struct taskentry *iter, *next;
	struct taskcode code;
	struct pollfd *fds;
	size_t fdsz, i = 0;
	int ret;

	/* First, read every pipes. */
	if (!(fdsz = count(tasks)))
		return;

	fds = util_calloc(fdsz, sizeof (*fds));

	for (iter = tasks; iter; iter = iter->next)
		task_prepare(iter->task, &fds[i++]);

	if (poll(fds, fdsz, 5000) < 0)
		log_warn("poll: %s", strerror(errno));

	for (iter = tasks, i = 0; i < fdsz; ++i) {
		next = iter->next;

		/*
		 *  0: EOF         [wait]
		 * -1: error       [kill + wait]
		 * >0: keep going  [nothing]
		 */
		if ((ret = task_sync(iter->task, &fds[i])) < 0) {
			log_warn(TAG "pipe error: %s, killing task", strerror(errno));

			if (task_kill(iter->task) < 0)
				log_warn(TAG "task kill error: %s", strerror(errno));
		}

		/* Now wait for the task to complete. */
		if (ret <= 0) {
			if (task_wait(iter->task) < 0)
				log_warn(TAG "task wait error: %s", strerror(errno));
			else {
				code = task_code(iter->task);

				switch (task_status(iter->task)) {
				case TASKSTATUS_EXITED:
					log_info(TAG "task %lld exited with code %d",
					    (long long int)task_pid(iter->task), code.exitcode);
					break;
				case TASKSTATUS_KILLED:
					log_info(TAG "task %lld killed with signal %d",
					    (long long int)task_pid(iter->task), code.sigcode);
					break;
				default:
					break;
				}
			}

			/* Remove that task and push to the outgoing queue. */
			next = iter->next;
			LL_DELETE(tasks, iter);
			LL_APPEND(taskfinished, iter);
		}
	}

	free(fds);
}

/*
 * Kill all tasks that have been running for too long.
 */
static void
ghost_all(void)
{
	struct taskentry *iter, *tmp;
	time_t now;

	LL_FOREACH_SAFE(tasks, iter, tmp) {
		if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout)
			continue;

		/* Do not attempt to wait if kill failed to avoid lock. */
		log_info(TAG "task timeout, killing");

		if (task_kill(iter->task) == 0)
			task_wait(iter->task);

		LL_DELETE(tasks, iter);
		LL_APPEND(taskfinished, iter);
	}
}

static int
publish(struct taskentry *iter)
{
	// TODO: add sigcode.
	struct taskcode code = task_code(iter->task);
	struct jobresult res = {
		.job_id = iter->job.id,
		.exitcode = code.exitcode,
		.log = task_console(iter->task),
		.worker_id = worker.id
	};
	struct apicreq req;
	json_t *doc;
	int ret; 

	doc = jobresult_to(&res, 1);
	ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url);
	json_decref(doc);

	if (ret)
		log_warn(TAG "unable to publish task: %s", req.error);
	else
		log_info(TAG "task successfully published");

	return ret;
}

static void
publish_all(void)
{
	struct taskentry *iter, *tmp;

	LL_FOREACH_SAFE(taskfinished, iter, tmp) {
		if (publish(iter) == 0) {
			LL_DELETE(taskfinished, iter);
			taskentry_free(iter);
		}
	}
}

void
sciworkerd_init(void)
{
	struct sigaction sa = {0};

	log_open("sigworkerd");

	sigemptyset(&sa.sa_mask);
	sa.sa_handler = stop;

	if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
		log_die(TAG "sigaction: %s", strerror(errno));

	fetch_worker();
}

void
sciworkerd_run(void)
{
	while (run) {
		fetch_jobs();
		process_all();
		ghost_all();
		publish_all();
	}
}

void
sciworkerd_finish(void)
{
	struct taskentry *iter, *tmp;

	LL_FOREACH_SAFE(taskpending, iter, tmp)
		taskentry_free(iter);
	LL_FOREACH_SAFE(tasks, iter, tmp)
		taskentry_free(iter);
	LL_FOREACH_SAFE(taskfinished, iter, tmp)
		taskentry_free(iter);
}