view sciworkerd.c @ 3:215c0c3b3609

misc: use JSON everywhere (scictl/sciwebd)
author David Demelier <markand@malikania.fr>
date Mon, 14 Jun 2021 22:08:24 +0200
parents 5fa3d2f479b2
children eb76429ce112
line wrap: on
line source

#include <sys/queue.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <assert.h>
#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <poll.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdnoreturn.h>
#include <string.h>
#include <unistd.h>

#include <curl/curl.h>
#include <jansson.h>

#include "config.h"
#include "log.h"
#include "types.h"
#include "util.h"

#define TAG_MAX 256

enum taskst {
	TASKST_PENDING,         /* not started yet. */
	TASKST_RUNNING,         /* currently running. */
	TASKST_COMPLETED,       /* completed but not synced yet. */
	TASKST_SYNCING          /* was unable to send result to host. */
};

struct task {
	enum taskst status;
	pid_t child;
	int pipe[2];
	int exitcode;
	int job_id;
	int project_id;
	char job_tag[TAG_MAX];
	char out[SCI_CONSOLE_MAX];
	char script[PATH_MAX];
	int scriptfd;
	TAILQ_ENTRY(task) link;
};

TAILQ_HEAD(tasks, task);

struct fds {
	struct pollfd *list;
	size_t listsz;
};

struct result {
	pid_t pid;
	int status;
};

struct fetch {
	char buf[SCI_MSG_MAX];
	FILE *bufp;
};

static struct {
	char *url;
	char *worker;
	int maxbuilds;
} config = {
	.url = "http://localhost",
	.worker = "default",
	.maxbuilds = 4
};

static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks);
static struct worker worker;

#if 0
static int sigpipe[2];
#endif

noreturn static void
usage(void)
{
	fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname());
	exit(1);
}

static inline struct task *
find_by_fd(int fd)
{
	struct task *tk;

	TAILQ_FOREACH(tk, &tasks, link)
		if (tk->pipe[0] == fd)
			return tk;

	return NULL;
}

static inline struct task *
find_by_pid(pid_t pid)
{
	struct task *t;

	TAILQ_FOREACH(t, &tasks, link)
		if (t->child == pid)
			return t;

	return NULL;
}

static void
destroy(struct task *tk)
{
	log_debug("destroying task %d", tk->job_id);

	if (tk->pipe[0])
		close(tk->pipe[0]);
	if (tk->pipe[1])
		close(tk->pipe[1]);
	if (tk->scriptfd) {
		unlink(tk->script);
		close(tk->scriptfd);
	}

	TAILQ_REMOVE(&tasks, tk, link);
	memset(tk, 0, sizeof (*tk));
	free(tk);
}

static int
spawn(struct task *tk)
{
	if (pipe(tk->pipe) < 0)
		goto cleanup;

	switch ((tk->child = fork())) {
	case -1:
		log_warn("spawn: %s", strerror(errno));
		goto cleanup;
	case 0:
		/* Child. */
		dup2(tk->pipe[1], STDOUT_FILENO);
		dup2(tk->pipe[1], STDERR_FILENO);
		close(tk->pipe[0]);
		close(tk->pipe[1]);
		log_debug("spawn: running process (%lld) %s",
		    (long long int)tk->child, tk->script);

		tk->status = TASKST_RUNNING;

		if (execl(tk->script, tk->script, tk->job_tag, NULL) < 0) {
			tk->status = TASKST_PENDING;
			log_warn("exec %s: %s", tk->script, strerror(errno));
			exit(0);
		}
		break;
	default:
		/* Parent */
		break;
	}

	return 0;

cleanup:
	destroy(tk);

	return -1;
}

static const char *
makeurl(const char *fmt, ...)
{
	assert(fmt);

	static char url[256];
	char page[128] = {0};
	va_list ap;

	va_start(ap, fmt);
	vsnprintf(page, sizeof (page), fmt, ap);
	va_end(ap);

	snprintf(url, sizeof (url), "%s/%s", config.url, page);

	return url;
}

static void
complete(int signum, siginfo_t *sinfo, void *ctx)
{
	(void)ctx;
	(void)signum;

#if 0
	struct result r;
	struct task *tk;
#endif
	struct task *tk;
	int status = 0;

	if (sinfo->si_code != CLD_EXITED)
		return;

#if 0
	r.pid = sinfo->si_pid;
	r.status = 0;
#endif

	if (waitpid(sinfo->si_pid, &status, 0) < 0) {
		log_warn("waitpid: %s", strerror(errno));
		return;
	}


	if ((tk = find_by_pid(sinfo->si_pid))) {
		log_debug("process %lld completed", (long long int)sinfo->si_pid);
		close(tk->pipe[1]);
		tk->status = TASKST_COMPLETED;
		tk->exitcode = status;
		tk->pipe[1] = 0;
	}

#if 0
	/*
	 * Signal may happen at any time from any thread so we can't use
	 * mutexes so use the good old self-pipe trick. Yes, signals are
	 * probably the most fundamental broken UNIX feature.
	 */
	if (write(sigpipe[1], &r, sizeof (r)) < 0)
		err(1, "write");
#endif
}

static const char *
uploadenc(const struct task *tk)
{
	json_t *doc;

	struct jobresult res = {0};
	char *dump;

	res.job_id = tk->job_id;
	res.exitcode = tk->exitcode;
	res.log = tk->out;
	res.worker_id = worker.id;

	doc = jobresult_to(&res, 1);
	dump = json_dumps(doc, JSON_COMPACT);

	json_decref(doc);

	return dump;
}

static size_t
getcb(char *in, size_t n, size_t w, FILE *fp)
{
	if (fwrite(in, n, w, fp) != w)
		return log_warn("get: %s", strerror(errno)), 0;

	return w;
}

static json_t *
get(const char *topic, const char *url)
{
	CURL *curl;
	CURLcode code;

	json_t *doc;
	json_error_t error;

	char buf[SCI_MSG_MAX];
	long status;
	FILE *fp;

	curl = curl_easy_init();

	if (!(fp = fmemopen(buf, sizeof (buf), "w")))
		err(1, "fmemopen");

	curl_easy_setopt(curl, CURLOPT_URL, url);
	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb);
	curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp);

	if ((code = curl_easy_perform(curl)) != CURLE_OK)
		log_warn("%s: %s", topic, curl_easy_strerror(code));

	curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
	curl_easy_cleanup(curl);

	fclose(fp);

	if (code != CURLE_OK)
		return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL;
	if (status != 200)
		return log_warn("%s: unexpected status code %ld", topic, status), NULL;
	if (!(doc = json_loads(buf, 0, &error)))
		return log_warn("%s: %s", topic, error.text), NULL;

	return doc;
}

static size_t
silent(char *in, size_t n, size_t w, void *data)
{
	(void)in;
	(void)n;
	(void)data;

	return w;
}

static void
upload(struct task *tk)
{
	CURL *curl;
	CURLcode code;
	long status;

	curl = curl_easy_init();
	curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs"));
	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
	curl_easy_setopt(curl, CURLOPT_POSTFIELDS, uploadenc(tk));
	code = curl_easy_perform(curl);

	/*
	 * If we fail to upload data, we put the result into syncing mode so
	 * that we retry later without redoing the job over and over
	 */
	tk->status = TASKST_SYNCING;

	if (code != CURLE_OK)
		log_warn("upload: %s", curl_easy_strerror(code));
	else {
		curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);

		if (status != 200)
			log_warn("upload: unexpected return code: %ld", status);
		else
			destroy(tk);
	}

	curl_easy_cleanup(curl);
}

static inline void
finished(struct task *tk)
{
	log_info("task %d: completed with exit code %d", tk->child, tk->exitcode);
	printf("== OUTPUT ==\n");
	puts(tk->out);
	upload(tk);
}

static inline int
pending(int id)
{
	struct task *t;

	TAILQ_FOREACH(t, &tasks, link)
		if (t->job_id == id)
			return 1;

	return 0;
}

static void
queue(int id, int project_id, const char *tag)
{
	struct task *tk;

	log_info("queued job build (%d) for tag %s\n", id, tag);

	tk = util_calloc(1, sizeof (*tk));
	tk->job_id = id;
	tk->project_id = project_id;
	strlcpy(tk->job_tag, tag, sizeof (tk->job_tag));

	TAILQ_INSERT_TAIL(&tasks, tk, link);
}

static void
merge(json_t *doc)
{
	struct job jobs[SCI_JOB_LIST_MAX];
	ssize_t jobsz;

	if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
		log_warn("fetchjobs: %s", strerror(errno));
	else {
		for (ssize_t i = 0; i < jobsz; ++i) {
			if (!pending(jobs[i].id))
				queue(jobs[i].id, jobs[i].project_id, jobs[i].tag);
		}
	}

	json_decref(doc);
}

static void
fetchjobs(void)
{
	json_t *doc;

	if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
		log_warn("unable to retrieve jobs");
	else
		merge(doc);
}

/*
 * This function reads stdout/stderr pipe from child and optionally remove them
 * if they have completed.
 */
static void
readall(struct fds *fds)
{
	struct task *tk;
	char buf[BUFSIZ];
	ssize_t nr;

	for (size_t i = 0; i < fds->listsz; ++i) {
		if (fds->list[i].revents == 0)
			continue;
		if (!(tk = find_by_fd(fds->list[i].fd)))
			continue;

		/* Read stdout/stderr from children pipe. */
		if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0)
			tk->status = TASKST_SYNCING;
		else {
			buf[nr] = 0;
			strlcat(tk->out, buf, sizeof (tk->out));
		}
	}
}

/*
 * Retrieve status code from spawned process complete or upload again if they
 * failed to sync.
 */
static void
flushall(void)
{
	struct task *tk, *tmp;

	TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp)
		if (tk->status == TASKST_SYNCING)
			upload(tk);
}

static int
extract(struct task *tk, json_t *doc)
{
	struct project proj;
	size_t len;

	if (project_from(&proj, 1, doc) < 0) {
		json_decref(doc);
		log_warn("fetchproject: %s", strerror(errno));
		return -1;
	}

	len = strlen(proj.script);

	if ((size_t)write(tk->scriptfd, proj.script, len) != len) {
		json_decref(doc);
		log_warn("fetchproject: %s", strerror(errno));
		return -1;
	}

	/* Close so we can finally spawn it. */
	close(tk->scriptfd);
	tk->scriptfd = 0;

	return 0;
}

static int
fetchproject(struct task *tk)
{
	json_t *doc;

	if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id))))
		return -1;

	return extract(tk, doc);
}

/*
 * Create a task to run the script. This will retrieve the project script code
 * at this moment and put it in a temporary file.
 */
static void
createtask(struct task *tk)
{
	if (tk->status != TASKST_PENDING)
		return;

	log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag);
	snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id);

	if ((tk->scriptfd = mkstemp(tk->script)) < 0 ||
	    fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) {
		unlink(tk->script);
		log_warn("%s", strerror(errno));
		return;
	}

	if (fetchproject(tk) < 0) {
		unlink(tk->script);
		close(tk->scriptfd);
		tk->scriptfd = 0;
	} else
		spawn(tk);
}

/*
 * Start all pending tasks if the limit of running tasks is not reached.
 */
static void
startall(void)
{
	size_t nrunning = 0;
	struct task *tk;

	TAILQ_FOREACH(tk, &tasks, link)
		if (tk->status == TASKST_RUNNING)
			++nrunning;

	if (nrunning >= (size_t)config.maxbuilds)
		log_debug("not spawning new process because limit is reached");
	else {
		tk = TAILQ_FIRST(&tasks);

		while (tk && nrunning++ < (size_t)config.maxbuilds) {
			createtask(tk);
			tk = TAILQ_NEXT(tk, link);
		}
	}
}

static void
fetchworker(void)
{
	json_t *doc;

	if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) ||
	    worker_from(&worker, 1, doc) < 0)
		errx(1, "unable to retrieve worker id");

	log_info("worker id: %d", worker.id);
	log_info("worker name: %s", worker.name);
	log_info("worker description: %s", worker.desc);

	json_decref(doc);
}

static void
init(void)
{
	struct sigaction sa;

	sa.sa_flags = SA_SIGINFO;
	sa.sa_sigaction = complete;
	sigemptyset(&sa.sa_mask);

	if (sigaction(SIGCHLD, &sa, NULL) < 0)
		err(1, "sigaction");

	log_open("sciworkerd");
	fetchworker();

#if 0
	if (pipe(sigpipe) < 0)
		err(1, "pipe");
	if ((flags = fcntl(sigpipe[1], F_GETFL, 0)) < 0 ||
	    fcntl(sigpipe[1], F_SETFL, flags | O_NONBLOCK) < 0)
		err(1, "fcntl");
#endif
}

static struct fds
prepare(void)
{
	struct fds fds = {0};
	struct task *tk;
	size_t i = 0;

	TAILQ_FOREACH(tk, &tasks, link)
		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED)
			fds.listsz++;

	fds.list = util_calloc(fds.listsz, sizeof (*fds.list));

#if 0
	fds.list[0].fd = sigpipe[0];
	fds.list[0].events = POLLIN;
#endif

	TAILQ_FOREACH(tk, &tasks, link) {
		if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
			fds.list[i].fd = tk->pipe[0];
			fds.list[i++].events = POLLIN | POLLPRI;
		}
	}

	return fds;
}

static void
run(void)
{
	struct fds fds;

	fds = prepare();

	if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR)
		err(1, "poll");

	fetchjobs();
	readall(&fds);
	startall();
	flushall();
}

int
main(int argc, char **argv)
{
	int ch;
	const char *errstr;

	setprogname("sciworkerd");

	while ((ch = getopt(argc, argv, "m:u:w:")) != -1) {
		switch (ch) {
		case 'm':
			config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr);

			if (errstr)
				errx(1, "%s: %s", optarg, errstr);

			break;
		case 'u':
			config.url = optarg;
			break;
		case 'w':
			config.worker = optarg;
			break;
		default:
			usage();
			break;
		}
	}

	init();

	for (;;)
		run();
}