diff sciworkerd/sciworkerd.c @ 27:dae2de19ca5d

misc: switch to JSON everywhere
author David Demelier <markand@malikania.fr>
date Wed, 03 Aug 2022 15:18:09 +0200
parents 34cbbd215ef7
children e52c762d8ba8
line wrap: on
line diff
--- a/sciworkerd/sciworkerd.c	Tue Aug 02 13:24:13 2022 +0200
+++ b/sciworkerd/sciworkerd.c	Wed Aug 03 15:18:09 2022 +0200
@@ -1,6 +1,7 @@
 #include <errno.h>
 #include <poll.h>
 #include <signal.h>
+#include <stdint.h>
 #include <string.h>
 #include <time.h>
 
@@ -10,21 +11,21 @@
 #include "log.h"
 #include "sciworkerd.h"
 #include "task.h"
-#include "types.h"
 #include "util.h"
 
 #define TAG "sciworkerd: "
 
 struct taskentry {
+	intmax_t job_id;
+	char *tag;
+	char *project_name;
 	struct task *task;
-	struct job job;
 	struct taskentry *next;
 };
 
 static struct taskentry *taskpending;
 static struct taskentry *tasks;
 static struct taskentry *taskfinished;
-static struct worker worker;
 static int run = 1;
 
 struct sciworkerd sciworkerd = {
@@ -53,41 +54,57 @@
 }
 
 static inline int
-pending(int id)
+pending(intmax_t job_id)
 {
 	const struct taskentry *iter;
 
 	LL_FOREACH(taskpending, iter)
-		if (iter->job.id == id)
+		if (iter->job_id == job_id)
 			return 1;
 
 	return 0;
 }
 
 static inline void
-queue(const struct job *job)
+queue(intmax_t id, const char *tag, const char *project_name)
 {
 	struct taskentry *tk;
 
-	log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag);
+	log_info(TAG "queued job build (%d) for tag %s", id, tag);
 
 	tk = util_calloc(1, sizeof (*tk));
-	tk->task = task_new(job->tag);
-	memcpy(&tk->job, job, sizeof (*job));
+	tk->job_id = id;
+	tk->tag = util_strdup(tag);
+	tk->project_name = util_strdup(project_name);
+	tk->task = task_new(tag);
 	LL_APPEND(taskpending, tk);
 }
 
 static void
-merge(struct job *jobs, size_t jobsz)
+merge(json_t *jobs)
 {
-	size_t total = 0;
+	json_int_t id;
+	json_t *val;
+	const char *tag, *project_name;
+	size_t total = 0, i;
+	int parse;
 
-	for (size_t i = 0; i < jobsz; ++i) {
-		if (!pending(jobs[i].id)) {
-			queue(&jobs[i]);
+	json_array_foreach(jobs, i, val) {
+		parse = json_unpack(val, "{sI ss ss}",
+			"id",           &id,
+			"tag",          &tag,
+			"project_name", &project_name
+		);
+
+		if (parse < 0) {
+			log_warn(TAG "unable to parse job");
+			continue;
+		}
+
+		if (!pending(id)) {
+			queue(id, tag, project_name);
 			total++;
-		} else
-			job_finish(&jobs[i]);
+		}
 	}
 
 	log_info(TAG "added %zu new pending tasks", total);
@@ -102,38 +119,21 @@
 	static time_t startup;
 	time_t now;
 	struct apic req;
-	struct job todo[SCI_JOB_LIST_MAX];
-	ssize_t todosz;
+	json_t *jobs;
 
 	if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
 		startup = now;
-
 		log_info(TAG "fetching jobs");
 
-		if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0)
+		if (!(jobs = apic_job_todo(&req, sciworkerd.name)))
 			log_warn(TAG "unable to fetch jobs: %s", req.error);
-		else
-			merge(todo, todosz);
-
-		apic_finish(&req);
+		else {
+			merge(jobs);
+			json_decref(jobs);
+		}
 	}
 }
 
-/*
- * Fetch information about myself.
- */
-static void
-fetch_worker(void)
-{
-	struct apic req;
-
-	if (apic_worker_find(&req, &worker, sciworkerd.name) < 0)
-		log_die(TAG "unable to fetch worker info: %s", req.error);
-
-	log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc);
-	apic_finish(&req);
-}
-
 static inline size_t
 count(const struct taskentry *head)
 {
@@ -154,14 +154,19 @@
 start(struct taskentry *entry)
 {
 	struct apic req;
-	struct project project;
+	json_t *doc;
+	const char *script;
 	pid_t pid;
 	int ret = -1;
 
-	if (apic_project_find(&req, &project, entry->job.project_name) < 0)
+	if (!(doc = apic_project_find(&req, entry->project_name)))
 		return log_warn(TAG "unable to fetch project, dropping task"), -1;
+	if (json_unpack(doc, "{ss}", "script", &script) < 0) {
+		json_decref(doc);
+		return log_warn(TAG "invalid project JSON object"), -1;
+	}
 
-	if (task_setup(entry->task, project.script) < 0)
+	if (task_setup(entry->task, script) < 0)
 		log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno));
 	else if ((pid = task_start(entry->task)) < 0)
 		log_warn(TAG "unable to spawn task process: %s", strerror(errno));
@@ -170,7 +175,7 @@
 		ret = 0;
 	}
 
-	project_finish(&project);
+	json_decref(doc);
 
 	return ret;
 }
@@ -297,24 +302,24 @@
 static int
 publish(struct taskentry *iter)
 {
-	// TODO: add sigcode.
-	struct taskcode code = task_code(iter->task);
-	struct jobresult res = {
-		.job_id = iter->job.id,
-		.exitcode = code.exitcode,
-		.log = util_strdup(task_console(iter->task)),
-		.worker_name = worker.name
-	};
 	struct apic req;
+	json_t *obj;
 	int ret = 0; 
 
-	if (apic_jobresult_add(&req, &res) < 0) {
+	obj = json_pack("{sI ss ss si si}",
+		"job_id",       iter->job_id,
+		"worker_name",  sciworkerd.name,
+		"console",      task_console(iter->task),
+		"exitcode",     task_code(iter->task).exitcode,
+		"sigcode",      task_code(iter->task).sigcode
+	);
+
+	if (apic_jobresult_add(&req, obj) < 0) {
 		log_warn(TAG "unable to publish task: %s", req.error);
 		ret = -1;
 	}
 
-	apic_finish(&req);
-	jobresult_finish(&res);
+	json_decref(obj);
 
 	return ret;
 }
@@ -339,6 +344,9 @@
 
 	log_open("sigworkerd");
 
+	if (strlen(sciworkerd.name) == 0)
+		log_die(TAG "no worker name defined");
+
 	sigemptyset(&sa.sa_mask);
 	sa.sa_handler = stop;
 
@@ -346,8 +354,6 @@
 
 	if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
 		log_die(TAG "sigaction: %s", strerror(errno));
-
-	fetch_worker();
 }
 
 void