changeset 23:2cb228f23f53

misc: rework todo/jobs HTTP requests
author David Demelier <markand@malikania.fr>
date Thu, 21 Jul 2022 21:55:02 +0200
parents dd078aea5d02
children 34cbbd215ef7
files Makefile lib/apic.c lib/apic.h scictl/scictl.c scid/http.c scid/page-api-jobs.c scid/page-api-jobs.h sciworkerd/main.c sciworkerd/sciworkerd.c
diffstat 9 files changed, 152 insertions(+), 50 deletions(-) [+]
line wrap: on
line diff
--- a/Makefile	Thu Jul 21 20:23:22 2022 +0200
+++ b/Makefile	Thu Jul 21 21:55:02 2022 +0200
@@ -52,6 +52,7 @@
 SCID=                   scid/scid
 SCID_SRCS=              scid/http.c                     \
                         scid/main.c                     \
+                        scid/page-api-jobs.c            \
                         scid/page-api-projects.c        \
                         scid/page-api-todo.c            \
                         scid/page-api-workers.c         \
@@ -59,6 +60,7 @@
 SCID_OBJS=              ${SCID_SRCS:.c=.o}
 SCID_DEPS=              ${SCID_SRCS:.c=.d}
 
+SCIWORKERD=             sciworkerd/sciworkerd
 SCIWORKERD_SRCS=        sciworkerd/main.c \
                         sciworkerd/sciworkerd.c \
                         sciworkerd/task.c
--- a/lib/apic.c	Thu Jul 21 20:23:22 2022 +0200
+++ b/lib/apic.c	Thu Jul 21 21:55:02 2022 +0200
@@ -283,7 +283,7 @@
 }
 
 ssize_t
-apic_job_todo(struct apic *req, struct job *jobs, size_t jobsz, intmax_t worker_id)
+apic_job_todo(struct apic *req, struct job *jobs, size_t jobsz, const char *worker_name)
 {
 	assert(req);
 	assert(jobs);
@@ -294,7 +294,7 @@
 		.unpack = wrap_job_from
 	};
 
-	return get(req, &cv, "api/v1/jobs/%jd", worker_id);
+	return get(req, &cv, "api/v1/todo/%s", worker_name);
 }
 
 int
--- a/lib/apic.h	Thu Jul 21 20:23:22 2022 +0200
+++ b/lib/apic.h	Thu Jul 21 21:55:02 2022 +0200
@@ -2,7 +2,6 @@
 #define SCI_APIC_H
 
 #include <sys/types.h>
-#include <stdint.h>
 
 #include <jansson.h>
 
@@ -47,7 +46,7 @@
 apic_job_add(struct apic *, struct job *);
 
 ssize_t
-apic_job_todo(struct apic *, struct job *, size_t, intmax_t);
+apic_job_todo(struct apic *, struct job *, size_t, const char *);
 
 int
 apic_jobresult_add(struct apic *, struct jobresult *);
--- a/scictl/scictl.c	Thu Jul 21 20:23:22 2022 +0200
+++ b/scictl/scictl.c	Thu Jul 21 21:55:02 2022 +0200
@@ -133,7 +133,7 @@
 	if (argc < 2)
 		usage();
 
-	if ((jobsz = apic_job_todo(&req, jobs, UTIL_SIZE(jobs), toint(argv[1]))))
+	if ((jobsz = apic_job_todo(&req, jobs, UTIL_SIZE(jobs), argv[1])))
 		util_die("abort: %s\n", req.error);
 
 	for (size_t i = 0; i < jobsz; ++i) {
--- a/scid/http.c	Thu Jul 21 20:23:22 2022 +0200
+++ b/scid/http.c	Thu Jul 21 21:55:02 2022 +0200
@@ -27,10 +27,11 @@
 
 #include "http.h"
 #include "log.h"
-#include "page.h"
+#include "page-api-jobs.h"
 #include "page-api-projects.h"
 #include "page-api-todo.h"
 #include "page-api-workers.h"
+#include "page.h"
 
 enum page {
 	PAGE_API,
@@ -44,8 +45,9 @@
 		const char *prefix;
 		void (*handler)(struct kreq *);
 	} apis[] = {
+		{ "v1/jobs",            page_api_v1_jobs        },
+		{ "v1/projects",        page_api_v1_projects    },
 		{ "v1/todo",            page_api_v1_todo        },
-		{ "v1/projects",        page_api_v1_projects    },
 		{ "v1/workers",         page_api_v1_workers     },
 		{ NULL,                 NULL                    }
 	};
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scid/page-api-jobs.c	Thu Jul 21 21:55:02 2022 +0200
@@ -0,0 +1,82 @@
+/*
+ * page-api-jobs.c -- /api/v?/jobs route
+ *
+ * Copyright (c) 2021 David Demelier <markand@malikania.fr>
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+#include <assert.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <kcgi.h>
+
+#include "db.h"
+#include "log.h"
+#include "page.h"
+#include "types.h"
+
+static int
+save(const char *json)
+{
+	struct job job = {0};
+	int ret = -1;
+
+	json_t *doc;
+	json_error_t err;
+
+	if (!(doc = json_loads(json, 0, &err)))
+		log_warn("api/post: invalid JSON input: %s", err.text);
+	else if (job_from(&job, 1, doc) < 0)
+		log_warn("api/post: failed to decode parameters");
+	else if (db_job_add(&job) < 0)
+		log_warn("api/post: database save error");
+	else
+		ret = 0;
+
+	json_decref(doc);
+	job_finish(&job);
+
+	return ret;
+}
+
+static void
+post(struct kreq *r)
+{
+	if (r->fieldsz < 1)
+		page(r, NULL, KHTTP_400, KMIME_APP_JSON, NULL);
+	else if (save(r->fields[0].val) < 0)
+		page(r, NULL, KHTTP_500, KMIME_APP_JSON, NULL);
+	else {
+		khttp_head(r, kresps[KRESP_CONTENT_TYPE], "%s", kmimetypes[KMIME_APP_JSON]);
+		khttp_head(r, kresps[KRESP_STATUS], "%s", khttps[KHTTP_200]);
+		khttp_body(r);
+		khttp_free(r);
+	}
+}
+
+void
+page_api_v1_jobs(struct kreq *r)
+{
+	assert(r);
+
+	switch (r->method) {
+	case KMETHOD_POST:
+		post(r);
+		break;
+	default:
+		page(r, NULL, KHTTP_400, KMIME_APP_JSON, NULL);
+		break;
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scid/page-api-jobs.h	Thu Jul 21 21:55:02 2022 +0200
@@ -0,0 +1,27 @@
+/*
+ * page-api-jobs.h -- /api/v?/jobs route
+ *
+ * Copyright (c) 2021 David Demelier <markand@malikania.fr>
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef SCI_PAGE_API_JOBS_H
+#define SCI_PAGE_API_JOBS_H
+
+struct kreq;
+
+void
+page_api_v1_jobs(struct kreq *);
+
+#endif /* !SCI_PAGE_API_JOBS_H */
--- a/sciworkerd/main.c	Thu Jul 21 20:23:22 2022 +0200
+++ b/sciworkerd/main.c	Thu Jul 21 21:55:02 2022 +0200
@@ -61,4 +61,8 @@
 			break;
 		}
 	}
+
+	sciworkerd_init();
+	sciworkerd_run();
+	sciworkerd_finish();
 }
--- a/sciworkerd/sciworkerd.c	Thu Jul 21 20:23:22 2022 +0200
+++ b/sciworkerd/sciworkerd.c	Thu Jul 21 21:55:02 2022 +0200
@@ -78,7 +78,7 @@
 }
 
 static void
-merge(const struct job *jobs, size_t jobsz)
+merge(struct job *jobs, size_t jobsz)
 {
 	size_t total = 0;
 
@@ -86,7 +86,8 @@
 		if (!pending(jobs[i].id)) {
 			queue(&jobs[i]);
 			total++;
-		}
+		} else
+			job_finish(&jobs[i]);
 	}
 
 	log_info(TAG "added %zu new pending tasks", total);
@@ -104,13 +105,12 @@
 	struct job todo[SCI_JOB_LIST_MAX];
 	ssize_t todosz;
 
-	if (!startup)
-		startup = time(NULL);
-
 	if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
 		startup = now;
 
-		if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.id)) < 0)
+		log_info(TAG "fetching jobs");
+
+		if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0)
 			log_warn(TAG "unable to fetch jobs: %s", req.error);
 		else
 			merge(todo, todosz);
@@ -127,40 +127,15 @@
 {
 	struct apic req;
 
-	util_strlcpy(&worker.name, sciworkerd.name);
+	if (apic_worker_find(&req, &worker, sciworkerd.name) < 0)
+		log_die(TAG "unable to fetch worker info: %s", req.error);
 
-	if (apic_worker_find(&req, &worker) < 0)
-		log_die(TAG, "unable to fetch worker info: %s", req.error);
-
-	log_info("worker id: %d", worker.id);
 	log_info("worker name: %s", worker.name);
 	log_info("worker description: %s", worker.desc);
 
 	apic_finish(&req);
 }
 
-/*
- * Fetch information about a project.
- */
-static int
-fetch_project(struct project *project, int id)
-{
-	struct apic req;
-
-	if (apic_project_find_id(&req, project, id) < 0)
-		return -1;
-#if 0
-	if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0)
-		return log_warn(TAG "unable to fetch project info: %s", req.error), -1;
-	if (!req.doc)
-		return log_warn(TAG "empty project response"), -1;
-	if (project_from(project, 1, req.doc) < 0)
-		return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1;
-#endif
-
-	return 0;
-}
-
 static inline size_t
 count(const struct taskentry *head)
 {
@@ -180,20 +155,26 @@
 static int
 start(struct taskentry *entry)
 {
-	struct apic;
+	struct apic req;
 	struct project project;
 	pid_t pid;
+	int ret = -1;
 
-	if (apic_project_find_id(&project, entry->job.project_id) < 0)
+	if (apic_project_find(&req, &project, entry->job.project_name) < 0)
 		return log_warn(TAG "unable to fetch project, dropping task"), -1;
+
 	if (task_setup(entry->task, project.script) < 0)
-		return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1;
-	if ((pid = task_start(entry->task)) < 0)
-		return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1;
+		log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno));
+	else if ((pid = task_start(entry->task)) < 0)
+		log_warn(TAG "unable to spawn task process: %s", strerror(errno));
+	else {
+		log_info(TAG "task %lld spawned", (long long int)pid);
+		ret = 0;
+	}
 
-	log_info(TAG "task %lld spawned", (long long int)pid);
+	project_finish(&project);
 
-	return 0;
+	return ret;
 }
 
 static inline void
@@ -319,10 +300,10 @@
 	struct jobresult res = {
 		.job_id = iter->job.id,
 		.exitcode = code.exitcode,
-		.log = task_console(iter->task),
-		.worker_id = worker.id
+		.log = util_strdup(task_console(iter->task)),
+		.worker_name = worker.name
 	};
-	struct apicreq req;
+	struct apic req;
 	json_t *doc;
 	int ret; 
 
@@ -335,6 +316,9 @@
 	else
 		log_info(TAG "task successfully published");
 
+	jobresult_finish(&res);
+	apic_finish(&req);
+
 	return ret;
 }
 
@@ -361,6 +345,8 @@
 	sigemptyset(&sa.sa_mask);
 	sa.sa_handler = stop;
 
+	util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl));
+
 	if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
 		log_die(TAG "sigaction: %s", strerror(errno));