diff sciworkerd/sciworkerd.c @ 24:34cbbd215ef7

misc: add basic support for jobresults
author David Demelier <markand@malikania.fr>
date Mon, 25 Jul 2022 21:11:23 +0200
parents 2cb228f23f53
children dae2de19ca5d
line wrap: on
line diff
--- a/sciworkerd/sciworkerd.c	Thu Jul 21 21:55:02 2022 +0200
+++ b/sciworkerd/sciworkerd.c	Mon Jul 25 21:11:23 2022 +0200
@@ -13,7 +13,7 @@
 #include "types.h"
 #include "util.h"
 
-#define TAG "sigworkerd: "
+#define TAG "sciworkerd: "
 
 struct taskentry {
 	struct task *task;
@@ -48,7 +48,7 @@
 static void
 stop(int sign)
 {
-	log_info(TAG "exiting on signal %d\n", sign);
+	log_info(TAG "exiting on signal %d", sign);
 	run = 0;
 }
 
@@ -69,7 +69,7 @@
 {
 	struct taskentry *tk;
 
-	log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag);
+	log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag);
 
 	tk = util_calloc(1, sizeof (*tk));
 	tk->task = task_new(job->tag);
@@ -82,7 +82,7 @@
 {
 	size_t total = 0;
 
-	for (ssize_t i = 0; i < jobsz; ++i) {
+	for (size_t i = 0; i < jobsz; ++i) {
 		if (!pending(jobs[i].id)) {
 			queue(&jobs[i]);
 			total++;
@@ -130,9 +130,7 @@
 	if (apic_worker_find(&req, &worker, sciworkerd.name) < 0)
 		log_die(TAG "unable to fetch worker info: %s", req.error);
 
-	log_info("worker name: %s", worker.name);
-	log_info("worker description: %s", worker.desc);
-
+	log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc);
 	apic_finish(&req);
 }
 
@@ -191,10 +189,11 @@
 	size_t running = count(tasks);
 	struct taskentry *entry;
 
-	while (running-- > 0 && (entry = taskpending)) {
+	while (running < sciworkerd.maxjobs && (entry = taskpending)) {
 		if (start(entry) < 0)
 			delete(entry);
 		else {
+			running++;
 			LL_DELETE(taskpending, entry);
 			LL_APPEND(tasks, entry);
 		}
@@ -208,6 +207,7 @@
 	struct taskcode code;
 	struct pollfd *fds;
 	size_t fdsz, i = 0;
+	pid_t pid;
 	int ret;
 
 	/* First, read every pipes. */
@@ -239,6 +239,8 @@
 
 		/* Now wait for the task to complete. */
 		if (ret <= 0) {
+			pid = task_pid(iter->task);
+
 			if (task_wait(iter->task) < 0)
 				log_warn(TAG "task wait error: %s", strerror(errno));
 			else {
@@ -247,11 +249,11 @@
 				switch (task_status(iter->task)) {
 				case TASKSTATUS_EXITED:
 					log_info(TAG "task %lld exited with code %d",
-					    (long long int)task_pid(iter->task), code.exitcode);
+					    (long long int)pid, code.exitcode);
 					break;
 				case TASKSTATUS_KILLED:
 					log_info(TAG "task %lld killed with signal %d",
-					    (long long int)task_pid(iter->task), code.sigcode);
+					    (long long int)pid, code.sigcode);
 					break;
 				default:
 					break;
@@ -259,10 +261,11 @@
 			}
 
 			/* Remove that task and push to the outgoing queue. */
-			next = iter->next;
 			LL_DELETE(tasks, iter);
 			LL_APPEND(taskfinished, iter);
 		}
+
+		iter = next;
 	}
 
 	free(fds);
@@ -275,7 +278,6 @@
 ghost_all(void)
 {
 	struct taskentry *iter, *tmp;
-	time_t now;
 
 	LL_FOREACH_SAFE(tasks, iter, tmp) {
 		if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout)
@@ -304,20 +306,15 @@
 		.worker_name = worker.name
 	};
 	struct apic req;
-	json_t *doc;
-	int ret; 
-
-	doc = jobresult_to(&res, 1);
-	ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url);
-	json_decref(doc);
+	int ret = 0; 
 
-	if (ret)
+	if (apic_jobresult_add(&req, &res) < 0) {
 		log_warn(TAG "unable to publish task: %s", req.error);
-	else
-		log_info(TAG "task successfully published");
+		ret = -1;
+	}
 
+	apic_finish(&req);
 	jobresult_finish(&res);
-	apic_finish(&req);
 
 	return ret;
 }
@@ -358,6 +355,7 @@
 {
 	while (run) {
 		fetch_jobs();
+		start_all();
 		process_all();
 		ghost_all();
 		publish_all();