comparison sciworkerd/main.c @ 19:de4bf839b565

misc: revamp SQL
author David Demelier <markand@malikania.fr>
date Fri, 15 Jul 2022 11:11:48 +0200
parents 600204c31bf0
children 2cb228f23f53
comparison
equal deleted inserted replaced
18:600204c31bf0 19:de4bf839b565
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */ 17 */
18 18
19 #if 0
20
21 #include <sys/queue.h>
22 #include <sys/stat.h>
23 #include <sys/types.h>
24 #include <sys/wait.h>
25 #include <assert.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <signal.h>
32 #include <stdio.h> 19 #include <stdio.h>
33 #include <stdlib.h> 20 #include <stdlib.h>
34 #include <stdnoreturn.h>
35 #include <string.h>
36 #include <unistd.h> 21 #include <unistd.h>
37 22
38 #include <curl/curl.h> 23 #include "sciworkerd.h"
39 #include <jansson.h>
40
41 #include "config.h"
42 #include "log.h"
43 #include "types.h"
44 #include "util.h"
45
46 #define TAG_MAX 256
47
48 struct task {
49 enum taskst status;
50 pid_t child;
51 int pipe[2];
52 int exitcode;
53 int job_id;
54 int project_id;
55 char job_tag[TAG_MAX];
56 char out[SCI_CONSOLE_MAX];
57 char script[PATH_MAX];
58 int scriptfd;
59 TAILQ_ENTRY(task) link;
60 };
61
62 TAILQ_HEAD(tasks, task);
63
64 struct fds {
65 struct pollfd *list;
66 size_t listsz;
67 };
68
69 struct fetch {
70 char buf[SCI_MSG_MAX];
71 FILE *bufp;
72 };
73
74 static struct {
75 char *url;
76 char *worker;
77 int maxbuilds;
78 } config = {
79 .url = "http://localhost",
80 .worker = "default",
81 .maxbuilds = 4
82 };
83
84 static struct tasks tasks = TAILQ_HEAD_INITIALIZER(tasks);
85 static struct worker worker;
86 static int alive = 1;
87
88 /*
89 * Show usage and exit with code 1.
90 */
91 noreturn static void
92 usage(void)
93 {
94 fprintf(stderr, "usage: %s [-m maxbuild] [-u url] [-w worker]\n", getprogname());
95 exit(1);
96 }
97
98 /*
99 * Find a task by its id.
100 */
101 static inline struct task *
102 find_by_fd(int fd)
103 {
104 struct task *tk;
105
106 TAILQ_FOREACH(tk, &tasks, link)
107 if (tk->pipe[0] == fd)
108 return tk;
109
110 return NULL;
111 }
112
113 /*
114 * Find a task by its pid number.
115 */
116 static inline struct task *
117 find_by_pid(pid_t pid)
118 {
119 struct task *t;
120
121 TAILQ_FOREACH(t, &tasks, link)
122 if (t->child == pid)
123 return t;
124
125 return NULL;
126 }
127
128 /*
129 * Destroy a task entirely.
130 */
131 static void
132 destroy(struct task *tk)
133 {
134 log_debug("destroying task %d", tk->job_id);
135 unlink(tk->script);
136
137 if (tk->pipe[0])
138 close(tk->pipe[0]);
139 if (tk->pipe[1])
140 close(tk->pipe[1]);
141 if (tk->scriptfd)
142 close(tk->scriptfd);
143
144 TAILQ_REMOVE(&tasks, tk, link);
145 memset(tk, 0, sizeof (*tk));
146 free(tk);
147 }
148
149 static const char *
150 makeurl(const char *fmt, ...)
151 {
152 assert(fmt);
153
154 static char url[256];
155 char page[128] = {0};
156 va_list ap;
157
158 va_start(ap, fmt);
159 vsnprintf(page, sizeof (page), fmt, ap);
160 va_end(ap);
161
162 snprintf(url, sizeof (url), "%s/%s", config.url, page);
163
164 return url;
165 }
166 24
167 static void 25 static void
168 complete(int signum, siginfo_t *sinfo, void *ctx) 26 env(void)
169 { 27 {
170 (void)ctx; 28 const char *env;
171 (void)signum;
172 29
173 struct task *tk; 30 if ((env = getenv("SCI_URL")))
174 31 snprintf(sciworkerd.url, sizeof (sciworkerd.url), "%s", optarg);
175 if (waitpid(sinfo->si_pid, NULL, 0) < 0) 32 if ((env = getenv("SCI_WORKER")))
176 log_warn("waitpid: %s", strerror(errno)); 33 snprintf(sciworkerd.name, sizeof (sciworkerd.name), "%s", optarg);
177
178 if ((tk = find_by_pid(sinfo->si_pid))) {
179 log_debug("process %d terminated (exitcode=%d)",
180 (int)sinfo->si_pid, sinfo->si_status);
181
182 close(tk->pipe[1]);
183 tk->status = TASKST_COMPLETED;
184 tk->exitcode = sinfo->si_status;
185 tk->pipe[1] = 0;
186 }
187 }
188
189 static void
190 stop(int signum)
191 {
192 log_warn("exiting on signal %d", signum);
193 alive = 0;
194 }
195
196 static char *
197 uploadenc(const struct task *tk)
198 {
199 json_t *doc;
200
201 struct jobresult res = {0};
202 char *dump;
203
204 res.job_id = tk->job_id;
205 res.exitcode = tk->exitcode;
206 res.log = tk->out;
207 res.worker_id = worker.id;
208
209 doc = jobresult_to(&res, 1);
210 dump = json_dumps(doc, JSON_COMPACT);
211
212 json_decref(doc);
213
214 return dump;
215 }
216
217 static size_t
218 getcb(char *in, size_t n, size_t w, FILE *fp)
219 {
220 if (fwrite(in, n, w, fp) != w)
221 return log_warn("get: %s", strerror(errno)), 0;
222
223 return w;
224 }
225
226 static json_t *
227 get(const char *topic, const char *url)
228 {
229 CURL *curl;
230 CURLcode code;
231
232 json_t *doc;
233 json_error_t error;
234
235 char buf[SCI_MSG_MAX];
236 long status;
237 FILE *fp;
238
239 curl = curl_easy_init();
240
241 if (!(fp = fmemopen(buf, sizeof (buf), "w")))
242 err(1, "fmemopen");
243
244 curl_easy_setopt(curl, CURLOPT_URL, url);
245 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
246 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
247 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, getcb);
248 curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp);
249
250 if ((code = curl_easy_perform(curl)) != CURLE_OK)
251 log_warn("%s: %s", topic, curl_easy_strerror(code));
252
253 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
254 curl_easy_cleanup(curl);
255
256 fclose(fp);
257
258 if (code != CURLE_OK)
259 return log_warn("%s: %s", topic, curl_easy_strerror(code)), NULL;
260 if (status != 200)
261 return log_warn("%s: unexpected status code %ld", topic, status), NULL;
262 if (!(doc = json_loads(buf, 0, &error)))
263 return log_warn("%s: %s", topic, error.text), NULL;
264
265 return doc;
266 }
267
268 static size_t
269 silent(char *in, size_t n, size_t w, void *data)
270 {
271 (void)in;
272 (void)n;
273 (void)data;
274
275 return w;
276 }
277
278 static void
279 upload(struct task *tk)
280 {
281 CURL *curl;
282 CURLcode code;
283 struct curl_slist *headers = NULL;
284 long status;
285 char *dump;
286
287 curl = curl_easy_init();
288 headers = curl_slist_append(headers, "Content-Type: application/json");
289 curl_easy_setopt(curl, CURLOPT_URL, makeurl("api/v1/jobs"));
290 //curl_easy_setopt(curl, CURLOPT_URL, "http://localhost:4000");
291 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L);
292 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
293 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, silent);
294 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (dump = uploadenc(tk)));
295 curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(dump));
296 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
297 code = curl_easy_perform(curl);
298 curl_slist_free_all(headers);
299
300 /*
301 * If we fail to upload data, we put the result into syncing mode so
302 * that we retry later without redoing the job over and over
303 */
304 tk->status = TASKST_SYNCING;
305
306 if (code != CURLE_OK)
307 log_warn("upload: %s", curl_easy_strerror(code));
308 else {
309 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
310
311 if (status != 200)
312 log_warn("upload: unexpected return code: %ld", status);
313 else
314 destroy(tk);
315 }
316
317 free(dump);
318 curl_easy_cleanup(curl);
319 }
320
321 static inline int
322 pending(int id)
323 {
324 struct task *t;
325
326 TAILQ_FOREACH(t, &tasks, link)
327 if (t->job_id == id)
328 return 1;
329
330 return 0;
331 }
332
333 static void
334 queue(int id, int project_id, const char *tag)
335 {
336 struct task *tk;
337
338 log_info("queued job build (%d) for tag %s\n", id, tag);
339
340 tk = util_calloc(1, sizeof (*tk));
341 tk->job_id = id;
342 tk->project_id = project_id;
343 strlcpy(tk->job_tag, tag, sizeof (tk->job_tag));
344
345 TAILQ_INSERT_TAIL(&tasks, tk, link);
346 }
347
348 static void
349 merge(json_t *doc)
350 {
351 struct job jobs[SCI_JOB_LIST_MAX];
352 ssize_t jobsz;
353
354 if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0)
355 log_warn("fetchjobs: %s", strerror(errno));
356 else {
357 for (ssize_t i = 0; i < jobsz; ++i) {
358 if (!pending(jobs[i].id))
359 queue(jobs[i].id, jobs[i].project_id, jobs[i].tag);
360 }
361 }
362
363 json_decref(doc);
364 }
365
366 static void
367 fetchjobs(void)
368 {
369 json_t *doc;
370
371 if (!(doc = get("fetch", makeurl("api/v1/jobs/%s", config.worker))))
372 log_warn("unable to retrieve jobs");
373 else
374 merge(doc);
375 }
376
377 /*
378 * This function reads stdout/stderr pipe from child and optionally remove them
379 * if they have completed.
380 */
381 static void
382 readall(struct fds *fds)
383 {
384 struct task *tk;
385 char buf[BUFSIZ];
386 ssize_t nr;
387
388 for (size_t i = 0; i < fds->listsz; ++i) {
389 if (fds->list[i].revents == 0)
390 continue;
391 if (!(tk = find_by_fd(fds->list[i].fd)))
392 continue;
393
394 /* Read stdout/stderr from children pipe. */
395 if ((nr = read(fds->list[i].fd, buf, sizeof (buf) - 1)) <= 0)
396 tk->status = TASKST_SYNCING;
397 else {
398 buf[nr] = 0;
399 strlcat(tk->out, buf, sizeof (tk->out));
400 }
401 }
402 }
403
404 /*
405 * Retrieve status code from spawned process complete or upload again if they
406 * failed to sync.
407 */
408 static void
409 flushall(void)
410 {
411 struct task *tk, *tmp;
412
413 TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp)
414 if (tk->status == TASKST_SYNCING)
415 upload(tk);
416 }
417
418 static int
419 extract(struct task *tk, json_t *doc)
420 {
421 struct project proj;
422 size_t len;
423
424 if (project_from(&proj, 1, doc) < 0) {
425 json_decref(doc);
426 log_warn("fetchproject: %s", strerror(errno));
427 return -1;
428 }
429
430 len = strlen(proj.script);
431
432 if ((size_t)write(tk->scriptfd, proj.script, len) != len) {
433 json_decref(doc);
434 log_warn("fetchproject: %s", strerror(errno));
435 return -1;
436 }
437
438 /* Close so we can finally spawn it. */
439 close(tk->scriptfd);
440 tk->scriptfd = 0;
441
442 return 0;
443 }
444
445 static int
446 fetchproject(struct task *tk)
447 {
448 json_t *doc;
449
450 if (!(doc = get("fetchproject", makeurl("api/v1/projects/%d", tk->project_id))))
451 return -1;
452
453 return extract(tk, doc);
454 }
455
456 /*
457 * Create a task to run the script. This will retrieve the project script code
458 * at this moment and put it in a temporary file.
459 */
460 static void
461 createtask(struct task *tk)
462 {
463 if (tk->status != TASKST_PENDING)
464 return;
465
466 log_debug("creating task (id=%d, tag=%s)", tk->job_id, tk->job_tag);
467 snprintf(tk->script, sizeof (tk->script), "/tmp/sciworkerd-%d-XXXXXX", tk->job_id);
468
469 if ((tk->scriptfd = mkstemp(tk->script)) < 0 ||
470 fchmod(tk->scriptfd, S_IRUSR | S_IWUSR | S_IXUSR) < 0) {
471 unlink(tk->script);
472 log_warn("%s", strerror(errno));
473 return;
474 }
475
476 if (fetchproject(tk) < 0) {
477 unlink(tk->script);
478 close(tk->scriptfd);
479 tk->scriptfd = 0;
480 } else
481 spawn(tk);
482 }
483
484 /*
485 * Start all pending tasks if the limit of running tasks is not reached.
486 */
487 static void
488 startall(void)
489 {
490 size_t nrunning = 0;
491 struct task *tk;
492
493 TAILQ_FOREACH(tk, &tasks, link)
494 if (tk->status == TASKST_RUNNING)
495 ++nrunning;
496
497 if (nrunning >= (size_t)config.maxbuilds)
498 log_debug("not spawning new process because limit is reached");
499 else {
500 tk = TAILQ_FIRST(&tasks);
501
502 while (tk && nrunning++ < (size_t)config.maxbuilds) {
503 createtask(tk);
504 tk = TAILQ_NEXT(tk, link);
505 }
506 }
507 }
508
509 static void
510 fetchworker(void)
511 {
512 json_t *doc;
513
514 if (!(doc = get("fetchworker", makeurl("api/v1/workers/%s", config.worker))) ||
515 worker_from(&worker, 1, doc) < 0)
516 errx(1, "unable to retrieve worker id");
517
518 log_info("worker id: %d", worker.id);
519 log_info("worker name: %s", worker.name);
520 log_info("worker description: %s", worker.desc);
521
522 json_decref(doc);
523 }
524
525 static void
526 init(void)
527 {
528 struct sigaction sa;
529
530 sa.sa_flags = SA_SIGINFO | SA_RESTART;
531 sa.sa_sigaction = complete;
532 sigemptyset(&sa.sa_mask);
533
534 if (sigaction(SIGCHLD, &sa, NULL) < 0)
535 err(1, "sigaction");
536
537 sa.sa_flags = SA_RESTART;
538 sa.sa_handler = stop;
539 sigemptyset(&sa.sa_mask);
540
541 if (sigaction(SIGTERM, &sa, NULL) < 0 || sigaction(SIGINT, &sa, NULL) < 0)
542 err(1, "sigaction");
543
544 log_open("sciworkerd");
545 fetchworker();
546 }
547
548 static struct fds
549 prepare(void)
550 {
551 struct fds fds = {0};
552 struct task *tk;
553 size_t i = 0;
554
555 TAILQ_FOREACH(tk, &tasks, link)
556 if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED)
557 fds.listsz++;
558
559 fds.list = util_calloc(fds.listsz, sizeof (*fds.list));
560
561 TAILQ_FOREACH(tk, &tasks, link) {
562 if (tk->status == TASKST_RUNNING || tk->status == TASKST_COMPLETED) {
563 fds.list[i].fd = tk->pipe[0];
564 fds.list[i++].events = POLLIN | POLLPRI;
565 }
566 }
567
568 return fds;
569 }
570
571 static void
572 run(void)
573 {
574 struct fds fds;
575
576 fds = prepare();
577
578 if (poll(fds.list, fds.listsz, 5000) < 0 && errno != EINTR)
579 err(1, "poll");
580
581 fetchjobs();
582 readall(&fds);
583 startall();
584 flushall();
585 }
586
587 static void
588 finish(void)
589 {
590 size_t tot = 0;
591 struct task *tk, *tmp;
592
593 TAILQ_FOREACH(tk, &tasks, link)
594 tot++;
595
596 signal(SIGCHLD, SIG_IGN);
597 log_debug("killing remaining %zu tasks", tot);
598
599 TAILQ_FOREACH_SAFE(tk, &tasks, link, tmp) {
600 if (tk->status == TASKST_RUNNING) {
601 kill(tk->child, SIGTERM);
602 waitpid(tk->child, NULL, 0);
603 }
604
605 destroy(tk);
606 }
607 } 34 }
608 35
609 int 36 int
610 main(int argc, char **argv) 37 main(int argc, char **argv)
611 { 38 {
612 int ch; 39 int ch, val;
613 const char *errstr;
614 40
615 setprogname("sciworkerd"); 41 env();
42 opterr = 0;
616 43
617 while ((ch = getopt(argc, argv, "m:u:w:")) != -1) { 44 while ((ch = getopt(argc, argv, "j:t:u:w:")) != -1) {
618 switch (ch) { 45 switch (ch) {
619 case 'm': 46 case 'j':
620 config.maxbuilds = strtonum(optarg, 0, INT_MAX, &errstr); 47 if ((val = atoi(optarg)) > 0)
621 48 sciworkerd.maxjobs = val;
622 if (errstr) 49 break;
623 errx(1, "%s: %s", optarg, errstr); 50 case 't':
624 51 if ((val = atoi(optarg)) > 0)
52 sciworkerd.timeout = val;
625 break; 53 break;
626 case 'u': 54 case 'u':
627 config.url = optarg; 55 snprintf(sciworkerd.url, sizeof (sciworkerd.url), "%s", optarg);
628 break; 56 break;
629 case 'w': 57 case 'w':
630 config.worker = optarg; 58 snprintf(sciworkerd.name, sizeof (sciworkerd.name), "%s", optarg);
631 break; 59 break;
632 default: 60 default:
633 usage();
634 break; 61 break;
635 } 62 }
636 } 63 }
637
638 init();
639
640 while (alive)
641 run();
642
643 finish();
644 } 64 }
645 #endif
646
647
648
649
650
651
652
653
654
655
656 #include <err.h>
657 #include <errno.h>
658 #include <poll.h>
659 #include <signal.h>
660 #include <string.h>
661 #include <time.h>
662 #include <unistd.h>
663
664 #include "types.h"
665 #include "task.h"
666
667 #define SCRIPT \
668 "#!/bin/sh\n" \
669 "echo yes\n" \
670 "sleep 10\n" \
671 "echo no 1>&2\n" \
672 "sleep 1\n" \
673 "exit 1"
674
675 int
676 main(void)
677 {
678 struct job job = {
679 .project_id = 10,
680 .id = 10,
681 .tag = "1234"
682 };
683 struct sigaction sa = {0};
684 struct pollfd fd;
685 struct task *t;
686 int run = 1;
687
688 t = task_new(&job);
689
690 if (task_setup(t, SCRIPT) < 0)
691 err(1, "task_set_script");
692 if (task_start(t) < 0)
693 err(1, "task_start");
694
695 while (run) {
696 if (difftime(time(NULL), task_uptime(t)) >= 3) {
697 printf("task timeout !\n");
698 task_kill(t);
699 task_wait(t);
700 break;
701 }
702
703 task_prepare(t, &fd);
704
705 if (poll(&fd, 1, 250) < 0 && errno != EINTR)
706 err(1, "poll");
707
708 switch (task_sync(t, &fd)) {
709 case -1:
710 err(1, "task_sync");
711 case 0:
712 run = 0;
713 task_wait(t);
714 break;
715 default:
716 /* Keep going... */
717 break;
718 }
719 }
720
721 switch (task_status(t)) {
722 case TASKSTATUS_EXITED:
723 printf("process exited with code: %d\n", task_code(t).exitcode);
724 break;
725 case TASKSTATUS_KILLED:
726 printf("process killed with signal %d\n", task_code(t).sigcode);
727 break;
728 default:
729 break;
730 }
731
732 printf("== console ==\n%s==\n", task_console(t));
733 task_free(t);
734 }