view sciworkerd/main.c @ 18:600204c31bf0

misc: refactor
author David Demelier <markand@malikania.fr>
date Tue, 12 Jul 2022 20:20:51 +0200
parents sciworkerd.c@40fe70256fb0
children de4bf839b565
line wrap: on
line source

/*
 * sciworkerd.c -- main sciworkerd(8) program file
 *
 * Copyright (c) 2021 David Demelier <markand@malikania.fr>
 *
 * Permission to use, copy, modify, and/or distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#if 0

#include <sys/queue.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <assert.h>
#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <poll.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdnoreturn.h>
#include <string.h>
#include <unistd.h>

#include <curl/curl.h>
#include <jansson.h>

#include "config.h"
#include "log.h"
#include "types.h"
#include "util.h"

#define TAG_MAX 256

struct task {
	enum taskst status;
	pid_t child;
	int pipe[2];
	int exitcode;
	int job_id;
	int project_id;
	char job_tag[TAG_MAX];
	char out[SCI_CONSOLE_MAX];
	char script[PATH_MAX];
	int scriptfd;
	TAILQ_ENTRY(task) link;
};

TAILQ_HEAD(tasks, task);

struct fds {
	struct pollfd *list;
	size_t listsz;
};

struct fetch {
	char buf[SCI_MSG_MAX];
	FILE *bufp;
};

static struct {
	char *url;
	char *worker;
	int maxbuilds;
} config = {
	.url = "http://localhost",
	.worker = "default",
	.maxbuilds = 4
};

static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks);
static struct worker worker;
static int alive = 1;

/*
 * Show usage and exit with code 1.
 */
noreturn static void
usage(void)
{
	fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname());
	exit(1);
}

/*
 * Find a task by its id.
 */
static inline struct task *
find_by_fd(int fd)
{
	struct task *tk;

	TAILQ_FOREACH(tk, &tasks, link)
		if (tk->pipe[0] == fd)
			return tk;

	return NULL;
}

/*
 * Find a task by its pid number.
 */
static inline struct task *
find_by_pid(pid_t pid)
{
	struct task *t;

	TAILQ_FOREACH(t, &tasks, link)
		if (t->child == pid)
			return t;

	return NULL;
}

/*
 * Destroy a task entirely.
 */
static void
destroy(struct task *tk)
{
	log_debug("destroying task %d", tk->job_id);
	unlink(tk->script);

	if (tk->pipe[0])
		close(tk->pipe[0]);
	if (tk->pipe[1])
		close(tk->pipe[1]);
	if (tk->scriptfd)
		close(tk->scriptfd);

	TAILQ_REMOVE(&tasks, tk, link);
	memset(tk, 0, sizeof (*tk));
	free(tk);
}

static const char *
makeurl(const char *fmt, ...)
{
	assert(fmt);

	static char url[256];
	char page[128] = {0};
	va_list ap;

	va_start(ap, fmt);
	vsnprintf(page, sizeof (page), fmt, ap);
	va_end(ap);

	snprintf(url, sizeof (url), "%s/%s", config.url, page);

	return url;
}

static void
complete(int signum, siginfo_t *sinfo, void *ctx)
{
	(void)ctx;
	(void)signum;

	struct task *tk;

	if (waitpid(sinfo->si_pid, NULL, 0) < 0)
		log_warn("waitpid: %s", strerror(errno));

	if ((tk = find_by_pid(sinfo->si_pid))) {
		log_debug("process %d terminated (exitcode=%d)",
		    (int)sinfo->si_pid, sinfo->si_status);

		close(tk->pipe[1]);
		tk->status = TASKST_COMPLETED;
		tk->exitcode = sinfo->si_status;
		tk->pipe[1] = 0;
	}
}

static void
stop(int signum)
{
	log_warn("exiting on signal %d", signum);
	alive = 0;
}

static char *
uploadenc(const struct task *tk)
{
	json_t *doc;

	struct jobresult res = {0};
	char *dump;

	res.job_id = tk->job_id;
	res.exitcode = tk->exitcode;
	res.log = tk->out;
	res.worker_id = worker.id;

	doc = jobresult_to(&res, 1);
	dump = json_dumps(doc, JSON_COMPACT);

	json_decref(doc);

	return dump;
}

static size_t
getcb(char *in, size_t n, size_t w, FILE *fp)
{
	if (fwrite(in, n, w, fp) != w)
		return log_warn("get: %s", strerror(errno)), 0;

	return w;
}

static json_t *
get(const char *topic, const char *url)
{
	CURL *curl;
	CURLcode code;

	json_t *doc;
	json_error_t error;

	char buf[SCI_MSG_MAX];
	long status;
	FILE *fp;

	curl = curl_easy_init();

	if (!(fp = fmemopen(buf, sizeof (buf), "w")))
		err(1, "fmemopen");

	curl_easy_setopt(curl, CURLOPT_URL, url);
	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb);
	curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp);

	if ((code = curl_easy_perform(curl)) != CURLE_OK)
		log_warn("%s: %s", topic, curl_easy_strerror(code));

	curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
	curl_easy_cleanup(curl);

	fclose(fp);

	if (code != CURLE_OK)
		return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL;
	if (status != 200)
		return log_warn("%s: unexpected status code %ld", topic, status), NULL;
	if (!(doc = json_loads(buf, 0, &error)))
		return log_warn("%s: %s", topic, error.text), NULL;

	return doc;
}

static size_t
silent(char *in, size_t n, size_t w, void *data)
{
	(void)in;
	(void)n;
	(void)data;

	return w;
}

static void
upload(struct task *tk)
{
	CURL *curl;
	CURLcode code;
	struct curl_slist *headers = NULL;
	long status;
	char *dump;

	curl = curl_easy_init();
	headers = curl_slist_append(headers, "Content-Type: application/json");
	curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs"));
	//curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000");
	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
	curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk)));
	curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump));
	curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
	code = curl_easy_perform(curl);
	curl_slist_free_all(headers);

	/*
	 * If we fail to upload data, we put the result into syncing mode so
	 * that we retry later without redoing the job over and over
	 */
	tk->status = TASKST_SYNCING;

	if (code != CURLE_OK)
		log_warn("upload: %s", curl_easy_strerror(code));
	else {
		curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);

		if (status != 200)
			log_warn("upload: unexpected return code: %ld", status);
		else
			destroy(tk);
	}

	free(dump);
	curl_easy_cleanup(curl);
}

static inline int
pending(int id)
{
	struct task *t;

	TAILQ_FOREACH(t, &tasks, link)
		if (t->job_id == id)
			return 1;

	return 0;
}

static void
queue(int id, int project_id, const char *tag)
{
	struct task *tk;

	log_info("queued job build (%d) for tag %s\n", id, tag);

	tk = util_calloc(1, sizeof (*tk));
	tk->job_id = id;
	tk->project_id = project_id;
	strlcpy(tk->job_tag, tag, sizeof (tk->job_tag));

	TAILQ_INSERT_TAIL(&tasks, tk, link);
}

static void
merge(json_t *doc)
{
	struct job jobs[SCI_JOB_LIST_MAX];
	ssize_t jobsz;

	if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
		log_warn("fetchjobs: %s", strerror(errno));
	else {
		for (ssize_t i = 0; i < jobsz; ++i) {
			if (!pending(jobs[i].id))
				queue(jobs[i].id, jobs[i].project_id, jobs[i].tag);
		}
	}

	json_decref(doc);
}

static void
fetchjobs(void)
{
	json_t *doc;

	if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
		log_warn("unable to retrieve jobs");
	else
		merge(doc);
}

/*
 * This function reads stdout/stderr pipe from child and optionally remove them
 * if they have completed.
 */
static void
readall(struct fds *fds)
{
	struct task *tk;
	char buf[BUFSIZ];
	ssize_t nr;

	for (size_t i = 0; i < fds->listsz; ++i) {
		if (fds->list[i].revents == 0)
			continue;
		if (!(tk = find_by_fd(fds->list[i].fd)))
			continue;

		/* Read stdout/stderr from children pipe. */
		if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0)
			tk->status = TASKST_SYNCING;
		else {
			buf[nr] = 0;
			strlcat(tk->out, buf, sizeof (tk->out));
		}
	}
}

/*
 * Retrieve status code from spawned process complete or upload again if they
 * failed to sync.
 */
static void
flushall(void)
{
	struct task *tk, *tmp;

	TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp)
		if (tk->status == TASKST_SYNCING)
			upload(tk);
}

static int
extract(struct task *tk, json_t *doc)
{
	struct project proj;
	size_t len;

	if (project_from(&proj, 1, doc) < 0) {
		json_decref(doc);
		log_warn("fetchproject: %s", strerror(errno));
		return -1;
	}

	len = strlen(proj.script);

	if ((size_t)write(tk->scriptfd, proj.script, len) != len) {
		json_decref(doc);
		log_warn("fetchproject: %s", strerror(errno));
		return -1;
	}

	/* Close so we can finally spawn it. */
	close(tk->scriptfd);
	tk->scriptfd = 0;

	return 0;
}

static int
fetchproject(struct task *tk)
{
	json_t *doc;

	if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id))))
		return -1;

	return extract(tk, doc);
}

/*
 * Create a task to run the script. This will retrieve the project script code
 * at this moment and put it in a temporary file.
 */
static void
createtask(struct task *tk)
{
	if (tk->status != TASKST_PENDING)
		return;

	log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag);
	snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id);

	if ((tk->scriptfd = mkstemp(tk->script)) < 0 ||
	    fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) {
		unlink(tk->script);
		log_warn("%s", strerror(errno));
		return;
	}

	if (fetchproject(tk) < 0) {
		unlink(tk->script);
		close(tk->scriptfd);
		tk->scriptfd = 0;
	} else
		spawn(tk);
}

/*
 * Start all pending tasks if the limit of running tasks is not reached.
 */
static void
startall(void)
{
	size_t nrunning = 0;
	struct task *tk;

	TAILQ_FOREACH(tk, &tasks, link)
		if (tk->status == TASKST_RUNNING)
			++nrunning;

	if (nrunning >= (size_t)config.maxbuilds)
		log_debug("not spawning new process because limit is reached");
	else {
		tk = TAILQ_FIRST(&tasks);

		while (tk && nrunning++ < (size_t)config.maxbuilds) {
			createtask(tk);
			tk = TAILQ_NEXT(tk, link);
		}
	}
}

static void
fetchworker(void)
{
	json_t *doc;

	if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) ||
	    worker_from(&worker, 1, doc) < 0)
		errx(1, "unable to retrieve worker id");

	log_info("worker id: %d", worker.id);
	log_info("worker name: %s", worker.name);
	log_info("worker description: %s", worker.desc);

	json_decref(doc);
}

static void
init(void)
{
	struct sigaction sa;

	sa.sa_flags = SA_SIGINFO | SA_RESTART;
	sa.sa_sigaction = complete;
	sigemptyset(&sa.sa_mask);

	if (sigaction(SIGCHLD, &sa, NULL) < 0)
		err(1, "sigaction");

	sa.sa_flags = SA_RESTART;
	sa.sa_handler = stop;
	sigemptyset(&sa.sa_mask);

	if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0)
		err(1, "sigaction");

	log_open("sciworkerd");
	fetchworker();
}

static struct fds
prepare(void)
{
	struct fds fds = {0};
	struct task *tk;
	size_t i = 0;

	TAILQ_FOREACH(tk, &tasks, link)
		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED)
			fds.listsz++;

	fds.list = util_calloc(fds.listsz, sizeof (*fds.list));

	TAILQ_FOREACH(tk, &tasks, link) {
		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
			fds.list[i].fd = tk->pipe[0];
			fds.list[i++].events = POLLIN | POLLPRI;
		}
	}

	return fds;
}

static void
run(void)
{
	struct fds fds;

	fds = prepare();

	if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR)
		err(1, "poll");

	fetchjobs();
	readall(&fds);
	startall();
	flushall();
}

static void
finish(void)
{
	size_t tot = 0;
	struct task *tk, *tmp;

	TAILQ_FOREACH(tk, &tasks, link)
		tot++;

	signal(SIGCHLD, SIG_IGN);
	log_debug("killing remaining %zu tasks", tot);

	TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) {
		if (tk->status == TASKST_RUNNING) {
			kill(tk->child, SIGTERM);
			waitpid(tk->child, NULL, 0);
		}

		destroy(tk);
	}
}

int
main(int argc, char **argv)
{
	int ch;
	const char *errstr;

	setprogname("sciworkerd");

	while ((ch = getopt(argc, argv, "m:u:w:")) != -1) {
		switch (ch) {
		case 'm':
			config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr);

			if (errstr)
				errx(1, "%s: %s", optarg, errstr);

			break;
		case 'u':
			config.url = optarg;
			break;
		case 'w':
			config.worker = optarg;
			break;
		default:
			usage();
			break;
		}
	}

	init();

	while (alive)
		run();

	finish();
}
#endif










#include <err.h>
#include <errno.h>
#include <poll.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include "types.h"
#include "task.h"

#define SCRIPT \
	"#!/bin/sh\n" \
	"echo yes\n" \
	"sleep 10\n" \
	"echo no 1>&2\n" \
	"sleep 1\n" \
	"exit 1"

int
main(void)
{
	struct job job = {
		.project_id = 10,
		.id = 10,
		.tag = "1234"
	};
	struct sigaction sa = {0};
	struct pollfd fd;
	struct task *t;
	int run = 1;

	t = task_new(&job);

	if (task_setup(t, SCRIPT) < 0)
		err(1, "task_set_script");
	if (task_start(t) < 0)
		err(1, "task_start");

	while (run) {
		if (difftime(time(NULL), task_uptime(t)) >= 3) {
			printf("task timeout !\n");
			task_kill(t);
			task_wait(t);
			break;
		}

		task_prepare(t, &fd);

		if (poll(&fd, 1, 250) < 0 && errno != EINTR)
			err(1, "poll");

		switch (task_sync(t, &fd)) {
		case -1:
			err(1, "task_sync");
		case 0:
			run = 0;
			task_wait(t);
			break;
		default:
			/* Keep going... */
			break;
		}
	}

	switch (task_status(t)) {
	case TASKSTATUS_EXITED:
		printf("process exited with code: %d\n", task_code(t).exitcode);
		break;
	case TASKSTATUS_KILLED:
		printf("process killed with signal %d\n", task_code(t).sigcode);
		break;
	default:
		break;
	}

	printf("== console ==\n%s==\n", task_console(t));
	task_free(t);
}