view sciworkerd/sciworkerd.c @ 85:cf49ab595e2e default tip @

sciworkerd: avoid spawning several tasks
author David Demelier <markand@malikania.fr>
date Thu, 09 Mar 2023 10:43:48 +0100
parents f3fc80a2bed7
children
line wrap: on
line source

/*
 * sciworkerd.c -- main sciworkerd file
 *
 * Copyright (c) 2021-2023 David Demelier <markand@malikania.fr>
 *
 * Permission to use, copy, modify, and/or distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <errno.h>
#include <poll.h>
#include <signal.h>
#include <stdint.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include <utlist.h>

#include "apic.h"
#include "log.h"
#include "sciworkerd.h"
#include "task.h"
#include "util.h"

#define TAG "sciworkerd: "

struct taskentry {
	intmax_t job_id;
	char *tag;
	char *project_name;
	struct task *task;
	struct taskentry *next;
};

static struct taskentry *taskpending;
static struct taskentry *tasks;
static struct taskentry *taskfinished;
static int run = 1;

struct sciworkerd sciworkerd = {
	.fetchinterval = 300,
	.maxjobs = 4,
	.timeout = 3600
};

static inline void
taskentry_free(struct taskentry *entry)
{
	if (task_status(entry->task) == TASKSTATUS_RUNNING) {
		if (task_kill(entry->task) == 0)
			task_wait(entry->task);
	}

	task_free(entry->task);
	free(entry);
}

static void
stop(int sign)
{
	log_info(TAG "exiting on signal %d", sign);
	run = 0;
}

static inline int
pending(intmax_t job_id)
{
	const struct taskentry *iter;

	/* Check if that task is listed for build. */
	LL_FOREACH(taskpending, iter)
		if (iter->job_id == job_id)
			return 1;

	/*
	 * But also make sure we don't add tasks that are already running as
	 * well.
	 */
	LL_FOREACH(tasks, iter)
		if (iter->job_id == job_id)
			return 1;

	return 0;
}

static inline void
queue(intmax_t id, const char *tag, const char *project_name)
{
	struct taskentry *tk;

	log_info(TAG "queued job build (%d) for tag %s", id, tag);

	tk = util_calloc(1, sizeof (*tk));
	tk->job_id = id;
	tk->tag = util_strdup(tag);
	tk->project_name = util_strdup(project_name);
	tk->task = task_new(tag);
	LL_APPEND(taskpending, tk);
}

static void
merge(json_t *jobs)
{
	json_int_t id;
	json_t *val;
	const char *tag, *project_name;
	size_t total = 0, i;
	int parse;

	json_array_foreach(jobs, i, val) {
		parse = json_unpack(val, "{sI ss ss}",
			"id",           &id,
			"tag",          &tag,
			"project_name", &project_name
		);

		if (parse < 0) {
			log_warn(TAG "unable to parse job");
			continue;
		}

		if (!pending(id)) {
			queue(id, tag, project_name);
			total++;
		}
	}

	if (total)
		log_info(TAG "added %zu new pending tasks", total);
}

/*
 * Fetch jobs periodically, depending on the user setting.
 */
static void
fetch_jobs(void)
{
	static time_t startup;
	time_t now;
	struct apic req;
	json_t *jobs;

	if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
		startup = now;
		log_debug(TAG "fetching jobs");

		if (!(jobs = apic_job_todo(&req, sciworkerd.name)))
			log_warn(TAG "unable to fetch jobs: %s", req.error);
		else {
			merge(jobs);
			json_decref(jobs);
		}
	}
}

static inline size_t
count(const struct taskentry *head)
{
	const struct taskentry *iter;
	size_t tot = 0;

	LL_FOREACH(head, iter)
		tot++;

	return tot;
}

/*
 * Start a task. We fetch its script code and then create the task with that
 * script.
 */
static int
start(struct taskentry *entry)
{
	struct apic req;
	json_t *doc;
	const char *script;
	pid_t pid;
	int ret = -1;

	if (!(doc = apic_project_find(&req, entry->project_name)))
		return log_warn(TAG "unable to fetch project, dropping task"), -1;
	if (json_unpack(doc, "{ss}", "script", &script) < 0) {
		json_decref(doc);
		return log_warn(TAG "invalid project JSON object"), -1;
	}

	if (task_setup(entry->task, script) < 0)
		log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno));
	else if ((pid = task_start(entry->task)) < 0)
		log_warn(TAG "unable to spawn task process: %s", strerror(errno));
	else {
		log_info(TAG "task %lld spawned", (long long int)pid);
		ret = 0;
	}

	json_decref(doc);

	return ret;
}

static inline void
delete(struct taskentry *entry)
{
	LL_DELETE(taskpending, entry);
	task_free(entry->task);
	free(entry);
}

static void
start_all(void)
{
	size_t running = count(tasks);
	struct taskentry *entry;

	while (running < sciworkerd.maxjobs && (entry = taskpending)) {
		if (start(entry) < 0)
			delete(entry);
		else {
			running++;
			LL_DELETE(taskpending, entry);
			LL_APPEND(tasks, entry);
		}
	}
}

static void
process_all(void)
{
	struct taskentry *iter, *next;
	struct taskcode code;
	struct pollfd *fds;
	size_t fdsz, i = 0;
	pid_t pid;
	int ret;

	/*
	 * Count every pipes. If there is no job we can just wait a little bit.
	 */
	if (!(fdsz = count(tasks))) {
		sleep(1);
		return;
	}

	fds = util_calloc(fdsz, sizeof (*fds));

	for (iter = tasks; iter; iter = iter->next)
		task_prepare(iter->task, &fds[i++]);

	if (poll(fds, fdsz, 5000) < 0)
		log_warn("poll: %s", strerror(errno));

	for (iter = tasks, i = 0; i < fdsz; ++i) {
		next = iter->next;

		/*
		 *  0: EOF         [wait]
		 * -1: error       [kill + wait]
		 * >0: keep going  [nothing]
		 */
		if ((ret = task_sync(iter->task, &fds[i])) < 0) {
			log_warn(TAG "pipe error: %s, killing task", strerror(errno));

			if (task_kill(iter->task) < 0)
				log_warn(TAG "task kill error: %s", strerror(errno));
		}

		/* Now wait for the task to complete. */
		if (ret <= 0) {
			pid = task_pid(iter->task);

			if (task_wait(iter->task) < 0)
				log_warn(TAG "task wait error: %s", strerror(errno));
			else {
				code = task_code(iter->task);

				switch (task_status(iter->task)) {
				case TASKSTATUS_EXITED:
					log_info(TAG "task %lld exited with code %d",
					    (long long int)pid, code.exitcode);
					break;
				case TASKSTATUS_KILLED:
					log_info(TAG "task %lld killed with signal %d",
					    (long long int)pid, code.sigcode);
					break;
				default:
					break;
				}
			}

			/* Remove that task and push to the outgoing queue. */
			LL_DELETE(tasks, iter);
			LL_APPEND(taskfinished, iter);
		}

		iter = next;
	}

	free(fds);
}

/*
 * Kill all tasks that have been running for too long.
 */
static void
ghost_all(void)
{
	struct taskentry *iter, *tmp;

	LL_FOREACH_SAFE(tasks, iter, tmp) {
		if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout)
			continue;

		/* Do not attempt to wait if kill failed to avoid lock. */
		log_info(TAG "task timeout, killing");

		if (task_kill(iter->task) == 0)
			task_wait(iter->task);

		LL_DELETE(tasks, iter);
		LL_APPEND(taskfinished, iter);
	}
}

static int
publish(struct taskentry *iter)
{
	struct apic req;
	json_t *obj;
	int ret = 0; 

	obj = json_pack("{sI ss ss si si}",
		"job_id",       iter->job_id,
		"worker_name",  sciworkerd.name,
		"console",      task_console(iter->task),
		"exitcode",     task_code(iter->task).exitcode,
		"sigcode",      task_code(iter->task).sigcode
	);

	if (apic_jobresult_add(&req, obj) < 0) {
		log_warn(TAG "unable to publish task: %s", req.error);
		ret = -1;
	}

	json_decref(obj);

	return ret;
}

static void
publish_all(void)
{
	struct taskentry *iter, *tmp;

	LL_FOREACH_SAFE(taskfinished, iter, tmp) {
		if (publish(iter) == 0) {
			LL_DELETE(taskfinished, iter);
			taskentry_free(iter);
		}
	}
}

void
sciworkerd_init(void)
{
	struct sigaction sa = {0};

	log_open("sigworkerd");

	if (strlen(sciworkerd.name) == 0)
		log_die(TAG "no worker name defined");

	sigemptyset(&sa.sa_mask);
	sa.sa_handler = stop;

	if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
		log_die(TAG "sigaction: %s", strerror(errno));
}

void
sciworkerd_run(void)
{
	while (run) {
		fetch_jobs();
		start_all();
		process_all();
		ghost_all();
		publish_all();
	}
}

void
sciworkerd_finish(void)
{
	struct taskentry *iter, *tmp;

	LL_FOREACH_SAFE(taskpending, iter, tmp)
		taskentry_free(iter);
	LL_FOREACH_SAFE(tasks, iter, tmp) {
		task_kill(iter->task);
		taskentry_free(iter);
	}
	LL_FOREACH_SAFE(taskfinished, iter, tmp)
		taskentry_free(iter);
}