Mercurial > sci
comparison sciworkerd/sciworkerd.c @ 24:34cbbd215ef7
misc: add basic support for jobresults
author | David Demelier <markand@malikania.fr> |
---|---|
date | Mon, 25 Jul 2022 21:11:23 +0200 |
parents | 2cb228f23f53 |
children | dae2de19ca5d |
comparison
equal
deleted
inserted
replaced
23:2cb228f23f53 | 24:34cbbd215ef7 |
---|---|
11 #include "sciworkerd.h" | 11 #include "sciworkerd.h" |
12 #include "task.h" | 12 #include "task.h" |
13 #include "types.h" | 13 #include "types.h" |
14 #include "util.h" | 14 #include "util.h" |
15 | 15 |
16 #define TAG "sigworkerd: " | 16 #define TAG "sciworkerd: " |
17 | 17 |
18 struct taskentry { | 18 struct taskentry { |
19 struct task *task; | 19 struct task *task; |
20 struct job job; | 20 struct job job; |
21 struct taskentry *next; | 21 struct taskentry *next; |
46 } | 46 } |
47 | 47 |
48 static void | 48 static void |
49 stop(int sign) | 49 stop(int sign) |
50 { | 50 { |
51 log_info(TAG "exiting on signal %d\n", sign); | 51 log_info(TAG "exiting on signal %d", sign); |
52 run = 0; | 52 run = 0; |
53 } | 53 } |
54 | 54 |
55 static inline int | 55 static inline int |
56 pending(int id) | 56 pending(int id) |
67 static inline void | 67 static inline void |
68 queue(const struct job *job) | 68 queue(const struct job *job) |
69 { | 69 { |
70 struct taskentry *tk; | 70 struct taskentry *tk; |
71 | 71 |
72 log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag); | 72 log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag); |
73 | 73 |
74 tk = util_calloc(1, sizeof (*tk)); | 74 tk = util_calloc(1, sizeof (*tk)); |
75 tk->task = task_new(job->tag); | 75 tk->task = task_new(job->tag); |
76 memcpy(&tk->job, job, sizeof (*job)); | 76 memcpy(&tk->job, job, sizeof (*job)); |
77 LL_APPEND(taskpending, tk); | 77 LL_APPEND(taskpending, tk); |
80 static void | 80 static void |
81 merge(struct job *jobs, size_t jobsz) | 81 merge(struct job *jobs, size_t jobsz) |
82 { | 82 { |
83 size_t total = 0; | 83 size_t total = 0; |
84 | 84 |
85 for (ssize_t i = 0; i < jobsz; ++i) { | 85 for (size_t i = 0; i < jobsz; ++i) { |
86 if (!pending(jobs[i].id)) { | 86 if (!pending(jobs[i].id)) { |
87 queue(&jobs[i]); | 87 queue(&jobs[i]); |
88 total++; | 88 total++; |
89 } else | 89 } else |
90 job_finish(&jobs[i]); | 90 job_finish(&jobs[i]); |
128 struct apic req; | 128 struct apic req; |
129 | 129 |
130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) | 130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) |
131 log_die(TAG "unable to fetch worker info: %s", req.error); | 131 log_die(TAG "unable to fetch worker info: %s", req.error); |
132 | 132 |
133 log_info("worker name: %s", worker.name); | 133 log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc); |
134 log_info("worker description: %s", worker.desc); | |
135 | |
136 apic_finish(&req); | 134 apic_finish(&req); |
137 } | 135 } |
138 | 136 |
139 static inline size_t | 137 static inline size_t |
140 count(const struct taskentry *head) | 138 count(const struct taskentry *head) |
189 start_all(void) | 187 start_all(void) |
190 { | 188 { |
191 size_t running = count(tasks); | 189 size_t running = count(tasks); |
192 struct taskentry *entry; | 190 struct taskentry *entry; |
193 | 191 |
194 while (running-- > 0 && (entry = taskpending)) { | 192 while (running < sciworkerd.maxjobs && (entry = taskpending)) { |
195 if (start(entry) < 0) | 193 if (start(entry) < 0) |
196 delete(entry); | 194 delete(entry); |
197 else { | 195 else { |
196 running++; | |
198 LL_DELETE(taskpending, entry); | 197 LL_DELETE(taskpending, entry); |
199 LL_APPEND(tasks, entry); | 198 LL_APPEND(tasks, entry); |
200 } | 199 } |
201 } | 200 } |
202 } | 201 } |
206 { | 205 { |
207 struct taskentry *iter, *next; | 206 struct taskentry *iter, *next; |
208 struct taskcode code; | 207 struct taskcode code; |
209 struct pollfd *fds; | 208 struct pollfd *fds; |
210 size_t fdsz, i = 0; | 209 size_t fdsz, i = 0; |
210 pid_t pid; | |
211 int ret; | 211 int ret; |
212 | 212 |
213 /* First, read every pipes. */ | 213 /* First, read every pipes. */ |
214 if (!(fdsz = count(tasks))) | 214 if (!(fdsz = count(tasks))) |
215 return; | 215 return; |
237 log_warn(TAG "task kill error: %s", strerror(errno)); | 237 log_warn(TAG "task kill error: %s", strerror(errno)); |
238 } | 238 } |
239 | 239 |
240 /* Now wait for the task to complete. */ | 240 /* Now wait for the task to complete. */ |
241 if (ret <= 0) { | 241 if (ret <= 0) { |
242 pid = task_pid(iter->task); | |
243 | |
242 if (task_wait(iter->task) < 0) | 244 if (task_wait(iter->task) < 0) |
243 log_warn(TAG "task wait error: %s", strerror(errno)); | 245 log_warn(TAG "task wait error: %s", strerror(errno)); |
244 else { | 246 else { |
245 code = task_code(iter->task); | 247 code = task_code(iter->task); |
246 | 248 |
247 switch (task_status(iter->task)) { | 249 switch (task_status(iter->task)) { |
248 case TASKSTATUS_EXITED: | 250 case TASKSTATUS_EXITED: |
249 log_info(TAG "task %lld exited with code %d", | 251 log_info(TAG "task %lld exited with code %d", |
250 (long long int)task_pid(iter->task), code.exitcode); | 252 (long long int)pid, code.exitcode); |
251 break; | 253 break; |
252 case TASKSTATUS_KILLED: | 254 case TASKSTATUS_KILLED: |
253 log_info(TAG "task %lld killed with signal %d", | 255 log_info(TAG "task %lld killed with signal %d", |
254 (long long int)task_pid(iter->task), code.sigcode); | 256 (long long int)pid, code.sigcode); |
255 break; | 257 break; |
256 default: | 258 default: |
257 break; | 259 break; |
258 } | 260 } |
259 } | 261 } |
260 | 262 |
261 /* Remove that task and push to the outgoing queue. */ | 263 /* Remove that task and push to the outgoing queue. */ |
262 next = iter->next; | |
263 LL_DELETE(tasks, iter); | 264 LL_DELETE(tasks, iter); |
264 LL_APPEND(taskfinished, iter); | 265 LL_APPEND(taskfinished, iter); |
265 } | 266 } |
267 | |
268 iter = next; | |
266 } | 269 } |
267 | 270 |
268 free(fds); | 271 free(fds); |
269 } | 272 } |
270 | 273 |
273 */ | 276 */ |
274 static void | 277 static void |
275 ghost_all(void) | 278 ghost_all(void) |
276 { | 279 { |
277 struct taskentry *iter, *tmp; | 280 struct taskentry *iter, *tmp; |
278 time_t now; | |
279 | 281 |
280 LL_FOREACH_SAFE(tasks, iter, tmp) { | 282 LL_FOREACH_SAFE(tasks, iter, tmp) { |
281 if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) | 283 if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) |
282 continue; | 284 continue; |
283 | 285 |
302 .exitcode = code.exitcode, | 304 .exitcode = code.exitcode, |
303 .log = util_strdup(task_console(iter->task)), | 305 .log = util_strdup(task_console(iter->task)), |
304 .worker_name = worker.name | 306 .worker_name = worker.name |
305 }; | 307 }; |
306 struct apic req; | 308 struct apic req; |
307 json_t *doc; | 309 int ret = 0; |
308 int ret; | 310 |
309 | 311 if (apic_jobresult_add(&req, &res) < 0) { |
310 doc = jobresult_to(&res, 1); | |
311 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); | |
312 json_decref(doc); | |
313 | |
314 if (ret) | |
315 log_warn(TAG "unable to publish task: %s", req.error); | 312 log_warn(TAG "unable to publish task: %s", req.error); |
316 else | 313 ret = -1; |
317 log_info(TAG "task successfully published"); | 314 } |
318 | 315 |
316 apic_finish(&req); | |
319 jobresult_finish(&res); | 317 jobresult_finish(&res); |
320 apic_finish(&req); | |
321 | 318 |
322 return ret; | 319 return ret; |
323 } | 320 } |
324 | 321 |
325 static void | 322 static void |
356 void | 353 void |
357 sciworkerd_run(void) | 354 sciworkerd_run(void) |
358 { | 355 { |
359 while (run) { | 356 while (run) { |
360 fetch_jobs(); | 357 fetch_jobs(); |
358 start_all(); | |
361 process_all(); | 359 process_all(); |
362 ghost_all(); | 360 ghost_all(); |
363 publish_all(); | 361 publish_all(); |
364 } | 362 } |
365 } | 363 } |