diff sciworkerd/sciworkerd.c @ 23:2cb228f23f53

misc: rework todo/jobs HTTP requests
author David Demelier <markand@malikania.fr>
date Thu, 21 Jul 2022 21:55:02 +0200
parents f98ea578b1ef
children 34cbbd215ef7
line wrap: on
line diff
--- a/sciworkerd/sciworkerd.c	Thu Jul 21 20:23:22 2022 +0200
+++ b/sciworkerd/sciworkerd.c	Thu Jul 21 21:55:02 2022 +0200
@@ -78,7 +78,7 @@
 }
 
 static void
-merge(const struct job *jobs, size_t jobsz)
+merge(struct job *jobs, size_t jobsz)
 {
 	size_t total = 0;
 
@@ -86,7 +86,8 @@
 		if (!pending(jobs[i].id)) {
 			queue(&jobs[i]);
 			total++;
-		}
+		} else
+			job_finish(&jobs[i]);
 	}
 
 	log_info(TAG "added %zu new pending tasks", total);
@@ -104,13 +105,12 @@
 	struct job todo[SCI_JOB_LIST_MAX];
 	ssize_t todosz;
 
-	if (!startup)
-		startup = time(NULL);
-
 	if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
 		startup = now;
 
-		if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.id)) < 0)
+		log_info(TAG "fetching jobs");
+
+		if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0)
 			log_warn(TAG "unable to fetch jobs: %s", req.error);
 		else
 			merge(todo, todosz);
@@ -127,40 +127,15 @@
 {
 	struct apic req;
 
-	util_strlcpy(&worker.name, sciworkerd.name);
+	if (apic_worker_find(&req, &worker, sciworkerd.name) < 0)
+		log_die(TAG "unable to fetch worker info: %s", req.error);
 
-	if (apic_worker_find(&req, &worker) < 0)
-		log_die(TAG, "unable to fetch worker info: %s", req.error);
-
-	log_info("worker id: %d", worker.id);
 	log_info("worker name: %s", worker.name);
 	log_info("worker description: %s", worker.desc);
 
 	apic_finish(&req);
 }
 
-/*
- * Fetch information about a project.
- */
-static int
-fetch_project(struct project *project, int id)
-{
-	struct apic req;
-
-	if (apic_project_find_id(&req, project, id) < 0)
-		return -1;
-#if 0
-	if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0)
-		return log_warn(TAG "unable to fetch project info: %s", req.error), -1;
-	if (!req.doc)
-		return log_warn(TAG "empty project response"), -1;
-	if (project_from(project, 1, req.doc) < 0)
-		return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1;
-#endif
-
-	return 0;
-}
-
 static inline size_t
 count(const struct taskentry *head)
 {
@@ -180,20 +155,26 @@
 static int
 start(struct taskentry *entry)
 {
-	struct apic;
+	struct apic req;
 	struct project project;
 	pid_t pid;
+	int ret = -1;
 
-	if (apic_project_find_id(&project, entry->job.project_id) < 0)
+	if (apic_project_find(&req, &project, entry->job.project_name) < 0)
 		return log_warn(TAG "unable to fetch project, dropping task"), -1;
+
 	if (task_setup(entry->task, project.script) < 0)
-		return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1;
-	if ((pid = task_start(entry->task)) < 0)
-		return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1;
+		log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno));
+	else if ((pid = task_start(entry->task)) < 0)
+		log_warn(TAG "unable to spawn task process: %s", strerror(errno));
+	else {
+		log_info(TAG "task %lld spawned", (long long int)pid);
+		ret = 0;
+	}
 
-	log_info(TAG "task %lld spawned", (long long int)pid);
+	project_finish(&project);
 
-	return 0;
+	return ret;
 }
 
 static inline void
@@ -319,10 +300,10 @@
 	struct jobresult res = {
 		.job_id = iter->job.id,
 		.exitcode = code.exitcode,
-		.log = task_console(iter->task),
-		.worker_id = worker.id
+		.log = util_strdup(task_console(iter->task)),
+		.worker_name = worker.name
 	};
-	struct apicreq req;
+	struct apic req;
 	json_t *doc;
 	int ret; 
 
@@ -335,6 +316,9 @@
 	else
 		log_info(TAG "task successfully published");
 
+	jobresult_finish(&res);
+	apic_finish(&req);
+
 	return ret;
 }
 
@@ -361,6 +345,8 @@
 	sigemptyset(&sa.sa_mask);
 	sa.sa_handler = stop;
 
+	util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl));
+
 	if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
 		log_die(TAG "sigaction: %s", strerror(errno));