comparison sciworkerd/sciworkerd.c @ 27:dae2de19ca5d

misc: switch to JSON everywhere
author David Demelier <markand@malikania.fr>
date Wed, 03 Aug 2022 15:18:09 +0200
parents 34cbbd215ef7
children e52c762d8ba8
comparison
equal deleted inserted replaced
26:7e10cace67a3 27:dae2de19ca5d
1 #include <errno.h> 1 #include <errno.h>
2 #include <poll.h> 2 #include <poll.h>
3 #include <signal.h> 3 #include <signal.h>
4 #include <stdint.h>
4 #include <string.h> 5 #include <string.h>
5 #include <time.h> 6 #include <time.h>
6 7
7 #include <utlist.h> 8 #include <utlist.h>
8 9
9 #include "apic.h" 10 #include "apic.h"
10 #include "log.h" 11 #include "log.h"
11 #include "sciworkerd.h" 12 #include "sciworkerd.h"
12 #include "task.h" 13 #include "task.h"
13 #include "types.h"
14 #include "util.h" 14 #include "util.h"
15 15
16 #define TAG "sciworkerd: " 16 #define TAG "sciworkerd: "
17 17
18 struct taskentry { 18 struct taskentry {
19 intmax_t job_id;
20 char *tag;
21 char *project_name;
19 struct task *task; 22 struct task *task;
20 struct job job;
21 struct taskentry *next; 23 struct taskentry *next;
22 }; 24 };
23 25
24 static struct taskentry *taskpending; 26 static struct taskentry *taskpending;
25 static struct taskentry *tasks; 27 static struct taskentry *tasks;
26 static struct taskentry *taskfinished; 28 static struct taskentry *taskfinished;
27 static struct worker worker;
28 static int run = 1; 29 static int run = 1;
29 30
30 struct sciworkerd sciworkerd = { 31 struct sciworkerd sciworkerd = {
31 .fetchinterval = 300, 32 .fetchinterval = 300,
32 .maxjobs = 4, 33 .maxjobs = 4,
51 log_info(TAG "exiting on signal %d", sign); 52 log_info(TAG "exiting on signal %d", sign);
52 run = 0; 53 run = 0;
53 } 54 }
54 55
55 static inline int 56 static inline int
56 pending(int id) 57 pending(intmax_t job_id)
57 { 58 {
58 const struct taskentry *iter; 59 const struct taskentry *iter;
59 60
60 LL_FOREACH(taskpending, iter) 61 LL_FOREACH(taskpending, iter)
61 if (iter->job.id == id) 62 if (iter->job_id == job_id)
62 return 1; 63 return 1;
63 64
64 return 0; 65 return 0;
65 } 66 }
66 67
67 static inline void 68 static inline void
68 queue(const struct job *job) 69 queue(intmax_t id, const char *tag, const char *project_name)
69 { 70 {
70 struct taskentry *tk; 71 struct taskentry *tk;
71 72
72 log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag); 73 log_info(TAG "queued job build (%d) for tag %s", id, tag);
73 74
74 tk = util_calloc(1, sizeof (*tk)); 75 tk = util_calloc(1, sizeof (*tk));
75 tk->task = task_new(job->tag); 76 tk->job_id = id;
76 memcpy(&tk->job, job, sizeof (*job)); 77 tk->tag = util_strdup(tag);
78 tk->project_name = util_strdup(project_name);
79 tk->task = task_new(tag);
77 LL_APPEND(taskpending, tk); 80 LL_APPEND(taskpending, tk);
78 } 81 }
79 82
80 static void 83 static void
81 merge(struct job *jobs, size_t jobsz) 84 merge(json_t *jobs)
82 { 85 {
83 size_t total = 0; 86 json_int_t id;
84 87 json_t *val;
85 for (size_t i = 0; i < jobsz; ++i) { 88 const char *tag, *project_name;
86 if (!pending(jobs[i].id)) { 89 size_t total = 0, i;
87 queue(&jobs[i]); 90 int parse;
91
92 json_array_foreach(jobs, i, val) {
93 parse = json_unpack(val, "{sI ss ss}",
94 "id", &id,
95 "tag", &tag,
96 "project_name", &project_name
97 );
98
99 if (parse < 0) {
100 log_warn(TAG "unable to parse job");
101 continue;
102 }
103
104 if (!pending(id)) {
105 queue(id, tag, project_name);
88 total++; 106 total++;
89 } else 107 }
90 job_finish(&jobs[i]);
91 } 108 }
92 109
93 log_info(TAG "added %zu new pending tasks", total); 110 log_info(TAG "added %zu new pending tasks", total);
94 } 111 }
95 112
100 fetch_jobs(void) 117 fetch_jobs(void)
101 { 118 {
102 static time_t startup; 119 static time_t startup;
103 time_t now; 120 time_t now;
104 struct apic req; 121 struct apic req;
105 struct job todo[SCI_JOB_LIST_MAX]; 122 json_t *jobs;
106 ssize_t todosz;
107 123
108 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { 124 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
109 startup = now; 125 startup = now;
110
111 log_info(TAG "fetching jobs"); 126 log_info(TAG "fetching jobs");
112 127
113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0) 128 if (!(jobs = apic_job_todo(&req, sciworkerd.name)))
114 log_warn(TAG "unable to fetch jobs: %s", req.error); 129 log_warn(TAG "unable to fetch jobs: %s", req.error);
115 else 130 else {
116 merge(todo, todosz); 131 merge(jobs);
117 132 json_decref(jobs);
118 apic_finish(&req); 133 }
119 } 134 }
120 }
121
122 /*
123 * Fetch information about myself.
124 */
125 static void
126 fetch_worker(void)
127 {
128 struct apic req;
129
130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0)
131 log_die(TAG "unable to fetch worker info: %s", req.error);
132
133 log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc);
134 apic_finish(&req);
135 } 135 }
136 136
137 static inline size_t 137 static inline size_t
138 count(const struct taskentry *head) 138 count(const struct taskentry *head)
139 { 139 {
152 */ 152 */
153 static int 153 static int
154 start(struct taskentry *entry) 154 start(struct taskentry *entry)
155 { 155 {
156 struct apic req; 156 struct apic req;
157 struct project project; 157 json_t *doc;
158 const char *script;
158 pid_t pid; 159 pid_t pid;
159 int ret = -1; 160 int ret = -1;
160 161
161 if (apic_project_find(&req, &project, entry->job.project_name) < 0) 162 if (!(doc = apic_project_find(&req, entry->project_name)))
162 return log_warn(TAG "unable to fetch project, dropping task"), -1; 163 return log_warn(TAG "unable to fetch project, dropping task"), -1;
163 164 if (json_unpack(doc, "{ss}", "script", &script) < 0) {
164 if (task_setup(entry->task, project.script) < 0) 165 json_decref(doc);
166 return log_warn(TAG "invalid project JSON object"), -1;
167 }
168
169 if (task_setup(entry->task, script) < 0)
165 log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); 170 log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno));
166 else if ((pid = task_start(entry->task)) < 0) 171 else if ((pid = task_start(entry->task)) < 0)
167 log_warn(TAG "unable to spawn task process: %s", strerror(errno)); 172 log_warn(TAG "unable to spawn task process: %s", strerror(errno));
168 else { 173 else {
169 log_info(TAG "task %lld spawned", (long long int)pid); 174 log_info(TAG "task %lld spawned", (long long int)pid);
170 ret = 0; 175 ret = 0;
171 } 176 }
172 177
173 project_finish(&project); 178 json_decref(doc);
174 179
175 return ret; 180 return ret;
176 } 181 }
177 182
178 static inline void 183 static inline void
295 } 300 }
296 301
297 static int 302 static int
298 publish(struct taskentry *iter) 303 publish(struct taskentry *iter)
299 { 304 {
300 // TODO: add sigcode.
301 struct taskcode code = task_code(iter->task);
302 struct jobresult res = {
303 .job_id = iter->job.id,
304 .exitcode = code.exitcode,
305 .log = util_strdup(task_console(iter->task)),
306 .worker_name = worker.name
307 };
308 struct apic req; 305 struct apic req;
306 json_t *obj;
309 int ret = 0; 307 int ret = 0;
310 308
311 if (apic_jobresult_add(&req, &res) < 0) { 309 obj = json_pack("{sI ss ss si si}",
310 "job_id", iter->job_id,
311 "worker_name", sciworkerd.name,
312 "console", task_console(iter->task),
313 "exitcode", task_code(iter->task).exitcode,
314 "sigcode", task_code(iter->task).sigcode
315 );
316
317 if (apic_jobresult_add(&req, obj) < 0) {
312 log_warn(TAG "unable to publish task: %s", req.error); 318 log_warn(TAG "unable to publish task: %s", req.error);
313 ret = -1; 319 ret = -1;
314 } 320 }
315 321
316 apic_finish(&req); 322 json_decref(obj);
317 jobresult_finish(&res);
318 323
319 return ret; 324 return ret;
320 } 325 }
321 326
322 static void 327 static void
337 { 342 {
338 struct sigaction sa = {0}; 343 struct sigaction sa = {0};
339 344
340 log_open("sigworkerd"); 345 log_open("sigworkerd");
341 346
347 if (strlen(sciworkerd.name) == 0)
348 log_die(TAG "no worker name defined");
349
342 sigemptyset(&sa.sa_mask); 350 sigemptyset(&sa.sa_mask);
343 sa.sa_handler = stop; 351 sa.sa_handler = stop;
344 352
345 util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); 353 util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl));
346 354
347 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) 355 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
348 log_die(TAG "sigaction: %s", strerror(errno)); 356 log_die(TAG "sigaction: %s", strerror(errno));
349
350 fetch_worker();
351 } 357 }
352 358
353 void 359 void
354 sciworkerd_run(void) 360 sciworkerd_run(void)
355 { 361 {