comparison sciworkerd/sciworkerd.c @ 24:34cbbd215ef7

misc: add basic support for jobresults
author David Demelier <markand@malikania.fr>
date Mon, 25 Jul 2022 21:11:23 +0200
parents 2cb228f23f53
children dae2de19ca5d
comparison
equal deleted inserted replaced
23:2cb228f23f53 24:34cbbd215ef7
11 #include "sciworkerd.h" 11 #include "sciworkerd.h"
12 #include "task.h" 12 #include "task.h"
13 #include "types.h" 13 #include "types.h"
14 #include "util.h" 14 #include "util.h"
15 15
16 #define TAG "sigworkerd: " 16 #define TAG "sciworkerd: "
17 17
18 struct taskentry { 18 struct taskentry {
19 struct task *task; 19 struct task *task;
20 struct job job; 20 struct job job;
21 struct taskentry *next; 21 struct taskentry *next;
46 } 46 }
47 47
48 static void 48 static void
49 stop(int sign) 49 stop(int sign)
50 { 50 {
51 log_info(TAG "exiting on signal %d\n", sign); 51 log_info(TAG "exiting on signal %d", sign);
52 run = 0; 52 run = 0;
53 } 53 }
54 54
55 static inline int 55 static inline int
56 pending(int id) 56 pending(int id)
67 static inline void 67 static inline void
68 queue(const struct job *job) 68 queue(const struct job *job)
69 { 69 {
70 struct taskentry *tk; 70 struct taskentry *tk;
71 71
72 log_info(TAG "queued job build (%d) for tag %s\n", job->id, job->tag); 72 log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag);
73 73
74 tk = util_calloc(1, sizeof (*tk)); 74 tk = util_calloc(1, sizeof (*tk));
75 tk->task = task_new(job->tag); 75 tk->task = task_new(job->tag);
76 memcpy(&tk->job, job, sizeof (*job)); 76 memcpy(&tk->job, job, sizeof (*job));
77 LL_APPEND(taskpending, tk); 77 LL_APPEND(taskpending, tk);
80 static void 80 static void
81 merge(struct job *jobs, size_t jobsz) 81 merge(struct job *jobs, size_t jobsz)
82 { 82 {
83 size_t total = 0; 83 size_t total = 0;
84 84
85 for (ssize_t i = 0; i < jobsz; ++i) { 85 for (size_t i = 0; i < jobsz; ++i) {
86 if (!pending(jobs[i].id)) { 86 if (!pending(jobs[i].id)) {
87 queue(&jobs[i]); 87 queue(&jobs[i]);
88 total++; 88 total++;
89 } else 89 } else
90 job_finish(&jobs[i]); 90 job_finish(&jobs[i]);
128 struct apic req; 128 struct apic req;
129 129
130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) 130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0)
131 log_die(TAG "unable to fetch worker info: %s", req.error); 131 log_die(TAG "unable to fetch worker info: %s", req.error);
132 132
133 log_info("worker name: %s", worker.name); 133 log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc);
134 log_info("worker description: %s", worker.desc);
135
136 apic_finish(&req); 134 apic_finish(&req);
137 } 135 }
138 136
139 static inline size_t 137 static inline size_t
140 count(const struct taskentry *head) 138 count(const struct taskentry *head)
189 start_all(void) 187 start_all(void)
190 { 188 {
191 size_t running = count(tasks); 189 size_t running = count(tasks);
192 struct taskentry *entry; 190 struct taskentry *entry;
193 191
194 while (running-- > 0 && (entry = taskpending)) { 192 while (running < sciworkerd.maxjobs && (entry = taskpending)) {
195 if (start(entry) < 0) 193 if (start(entry) < 0)
196 delete(entry); 194 delete(entry);
197 else { 195 else {
196 running++;
198 LL_DELETE(taskpending, entry); 197 LL_DELETE(taskpending, entry);
199 LL_APPEND(tasks, entry); 198 LL_APPEND(tasks, entry);
200 } 199 }
201 } 200 }
202 } 201 }
206 { 205 {
207 struct taskentry *iter, *next; 206 struct taskentry *iter, *next;
208 struct taskcode code; 207 struct taskcode code;
209 struct pollfd *fds; 208 struct pollfd *fds;
210 size_t fdsz, i = 0; 209 size_t fdsz, i = 0;
210 pid_t pid;
211 int ret; 211 int ret;
212 212
213 /* First, read every pipes. */ 213 /* First, read every pipes. */
214 if (!(fdsz = count(tasks))) 214 if (!(fdsz = count(tasks)))
215 return; 215 return;
237 log_warn(TAG "task kill error: %s", strerror(errno)); 237 log_warn(TAG "task kill error: %s", strerror(errno));
238 } 238 }
239 239
240 /* Now wait for the task to complete. */ 240 /* Now wait for the task to complete. */
241 if (ret <= 0) { 241 if (ret <= 0) {
242 pid = task_pid(iter->task);
243
242 if (task_wait(iter->task) < 0) 244 if (task_wait(iter->task) < 0)
243 log_warn(TAG "task wait error: %s", strerror(errno)); 245 log_warn(TAG "task wait error: %s", strerror(errno));
244 else { 246 else {
245 code = task_code(iter->task); 247 code = task_code(iter->task);
246 248
247 switch (task_status(iter->task)) { 249 switch (task_status(iter->task)) {
248 case TASKSTATUS_EXITED: 250 case TASKSTATUS_EXITED:
249 log_info(TAG "task %lld exited with code %d", 251 log_info(TAG "task %lld exited with code %d",
250 (long long int)task_pid(iter->task), code.exitcode); 252 (long long int)pid, code.exitcode);
251 break; 253 break;
252 case TASKSTATUS_KILLED: 254 case TASKSTATUS_KILLED:
253 log_info(TAG "task %lld killed with signal %d", 255 log_info(TAG "task %lld killed with signal %d",
254 (long long int)task_pid(iter->task), code.sigcode); 256 (long long int)pid, code.sigcode);
255 break; 257 break;
256 default: 258 default:
257 break; 259 break;
258 } 260 }
259 } 261 }
260 262
261 /* Remove that task and push to the outgoing queue. */ 263 /* Remove that task and push to the outgoing queue. */
262 next = iter->next;
263 LL_DELETE(tasks, iter); 264 LL_DELETE(tasks, iter);
264 LL_APPEND(taskfinished, iter); 265 LL_APPEND(taskfinished, iter);
265 } 266 }
267
268 iter = next;
266 } 269 }
267 270
268 free(fds); 271 free(fds);
269 } 272 }
270 273
273 */ 276 */
274 static void 277 static void
275 ghost_all(void) 278 ghost_all(void)
276 { 279 {
277 struct taskentry *iter, *tmp; 280 struct taskentry *iter, *tmp;
278 time_t now;
279 281
280 LL_FOREACH_SAFE(tasks, iter, tmp) { 282 LL_FOREACH_SAFE(tasks, iter, tmp) {
281 if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout) 283 if (difftime(time(NULL), task_uptime(iter->task)) < sciworkerd.timeout)
282 continue; 284 continue;
283 285
302 .exitcode = code.exitcode, 304 .exitcode = code.exitcode,
303 .log = util_strdup(task_console(iter->task)), 305 .log = util_strdup(task_console(iter->task)),
304 .worker_name = worker.name 306 .worker_name = worker.name
305 }; 307 };
306 struct apic req; 308 struct apic req;
307 json_t *doc; 309 int ret = 0;
308 int ret; 310
309 311 if (apic_jobresult_add(&req, &res) < 0) {
310 doc = jobresult_to(&res, 1);
311 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url);
312 json_decref(doc);
313
314 if (ret)
315 log_warn(TAG "unable to publish task: %s", req.error); 312 log_warn(TAG "unable to publish task: %s", req.error);
316 else 313 ret = -1;
317 log_info(TAG "task successfully published"); 314 }
318 315
316 apic_finish(&req);
319 jobresult_finish(&res); 317 jobresult_finish(&res);
320 apic_finish(&req);
321 318
322 return ret; 319 return ret;
323 } 320 }
324 321
325 static void 322 static void
356 void 353 void
357 sciworkerd_run(void) 354 sciworkerd_run(void)
358 { 355 {
359 while (run) { 356 while (run) {
360 fetch_jobs(); 357 fetch_jobs();
358 start_all();
361 process_all(); 359 process_all();
362 ghost_all(); 360 ghost_all();
363 publish_all(); 361 publish_all();
364 } 362 }
365 } 363 }