Mercurial > sci
comparison sciworkerd/sciworkerd.c @ 27:dae2de19ca5d
misc: switch to JSON everywhere
author | David Demelier <markand@malikania.fr> |
---|---|
date | Wed, 03 Aug 2022 15:18:09 +0200 |
parents | 34cbbd215ef7 |
children | e52c762d8ba8 |
comparison
equal
deleted
inserted
replaced
26:7e10cace67a3 | 27:dae2de19ca5d |
---|---|
1 #include <errno.h> | 1 #include <errno.h> |
2 #include <poll.h> | 2 #include <poll.h> |
3 #include <signal.h> | 3 #include <signal.h> |
4 #include <stdint.h> | |
4 #include <string.h> | 5 #include <string.h> |
5 #include <time.h> | 6 #include <time.h> |
6 | 7 |
7 #include <utlist.h> | 8 #include <utlist.h> |
8 | 9 |
9 #include "apic.h" | 10 #include "apic.h" |
10 #include "log.h" | 11 #include "log.h" |
11 #include "sciworkerd.h" | 12 #include "sciworkerd.h" |
12 #include "task.h" | 13 #include "task.h" |
13 #include "types.h" | |
14 #include "util.h" | 14 #include "util.h" |
15 | 15 |
16 #define TAG "sciworkerd: " | 16 #define TAG "sciworkerd: " |
17 | 17 |
18 struct taskentry { | 18 struct taskentry { |
19 intmax_t job_id; | |
20 char *tag; | |
21 char *project_name; | |
19 struct task *task; | 22 struct task *task; |
20 struct job job; | |
21 struct taskentry *next; | 23 struct taskentry *next; |
22 }; | 24 }; |
23 | 25 |
24 static struct taskentry *taskpending; | 26 static struct taskentry *taskpending; |
25 static struct taskentry *tasks; | 27 static struct taskentry *tasks; |
26 static struct taskentry *taskfinished; | 28 static struct taskentry *taskfinished; |
27 static struct worker worker; | |
28 static int run = 1; | 29 static int run = 1; |
29 | 30 |
30 struct sciworkerd sciworkerd = { | 31 struct sciworkerd sciworkerd = { |
31 .fetchinterval = 300, | 32 .fetchinterval = 300, |
32 .maxjobs = 4, | 33 .maxjobs = 4, |
51 log_info(TAG "exiting on signal %d", sign); | 52 log_info(TAG "exiting on signal %d", sign); |
52 run = 0; | 53 run = 0; |
53 } | 54 } |
54 | 55 |
55 static inline int | 56 static inline int |
56 pending(int id) | 57 pending(intmax_t job_id) |
57 { | 58 { |
58 const struct taskentry *iter; | 59 const struct taskentry *iter; |
59 | 60 |
60 LL_FOREACH(taskpending, iter) | 61 LL_FOREACH(taskpending, iter) |
61 if (iter->job.id == id) | 62 if (iter->job_id == job_id) |
62 return 1; | 63 return 1; |
63 | 64 |
64 return 0; | 65 return 0; |
65 } | 66 } |
66 | 67 |
67 static inline void | 68 static inline void |
68 queue(const struct job *job) | 69 queue(intmax_t id, const char *tag, const char *project_name) |
69 { | 70 { |
70 struct taskentry *tk; | 71 struct taskentry *tk; |
71 | 72 |
72 log_info(TAG "queued job build (%d) for tag %s", job->id, job->tag); | 73 log_info(TAG "queued job build (%d) for tag %s", id, tag); |
73 | 74 |
74 tk = util_calloc(1, sizeof (*tk)); | 75 tk = util_calloc(1, sizeof (*tk)); |
75 tk->task = task_new(job->tag); | 76 tk->job_id = id; |
76 memcpy(&tk->job, job, sizeof (*job)); | 77 tk->tag = util_strdup(tag); |
78 tk->project_name = util_strdup(project_name); | |
79 tk->task = task_new(tag); | |
77 LL_APPEND(taskpending, tk); | 80 LL_APPEND(taskpending, tk); |
78 } | 81 } |
79 | 82 |
80 static void | 83 static void |
81 merge(struct job *jobs, size_t jobsz) | 84 merge(json_t *jobs) |
82 { | 85 { |
83 size_t total = 0; | 86 json_int_t id; |
84 | 87 json_t *val; |
85 for (size_t i = 0; i < jobsz; ++i) { | 88 const char *tag, *project_name; |
86 if (!pending(jobs[i].id)) { | 89 size_t total = 0, i; |
87 queue(&jobs[i]); | 90 int parse; |
91 | |
92 json_array_foreach(jobs, i, val) { | |
93 parse = json_unpack(val, "{sI ss ss}", | |
94 "id", &id, | |
95 "tag", &tag, | |
96 "project_name", &project_name | |
97 ); | |
98 | |
99 if (parse < 0) { | |
100 log_warn(TAG "unable to parse job"); | |
101 continue; | |
102 } | |
103 | |
104 if (!pending(id)) { | |
105 queue(id, tag, project_name); | |
88 total++; | 106 total++; |
89 } else | 107 } |
90 job_finish(&jobs[i]); | |
91 } | 108 } |
92 | 109 |
93 log_info(TAG "added %zu new pending tasks", total); | 110 log_info(TAG "added %zu new pending tasks", total); |
94 } | 111 } |
95 | 112 |
100 fetch_jobs(void) | 117 fetch_jobs(void) |
101 { | 118 { |
102 static time_t startup; | 119 static time_t startup; |
103 time_t now; | 120 time_t now; |
104 struct apic req; | 121 struct apic req; |
105 struct job todo[SCI_JOB_LIST_MAX]; | 122 json_t *jobs; |
106 ssize_t todosz; | |
107 | 123 |
108 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { | 124 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { |
109 startup = now; | 125 startup = now; |
110 | |
111 log_info(TAG "fetching jobs"); | 126 log_info(TAG "fetching jobs"); |
112 | 127 |
113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0) | 128 if (!(jobs = apic_job_todo(&req, sciworkerd.name))) |
114 log_warn(TAG "unable to fetch jobs: %s", req.error); | 129 log_warn(TAG "unable to fetch jobs: %s", req.error); |
115 else | 130 else { |
116 merge(todo, todosz); | 131 merge(jobs); |
117 | 132 json_decref(jobs); |
118 apic_finish(&req); | 133 } |
119 } | 134 } |
120 } | |
121 | |
122 /* | |
123 * Fetch information about myself. | |
124 */ | |
125 static void | |
126 fetch_worker(void) | |
127 { | |
128 struct apic req; | |
129 | |
130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) | |
131 log_die(TAG "unable to fetch worker info: %s", req.error); | |
132 | |
133 log_info("sciworkerd: worker %s (%s)", worker.name, worker.desc); | |
134 apic_finish(&req); | |
135 } | 135 } |
136 | 136 |
137 static inline size_t | 137 static inline size_t |
138 count(const struct taskentry *head) | 138 count(const struct taskentry *head) |
139 { | 139 { |
152 */ | 152 */ |
153 static int | 153 static int |
154 start(struct taskentry *entry) | 154 start(struct taskentry *entry) |
155 { | 155 { |
156 struct apic req; | 156 struct apic req; |
157 struct project project; | 157 json_t *doc; |
158 const char *script; | |
158 pid_t pid; | 159 pid_t pid; |
159 int ret = -1; | 160 int ret = -1; |
160 | 161 |
161 if (apic_project_find(&req, &project, entry->job.project_name) < 0) | 162 if (!(doc = apic_project_find(&req, entry->project_name))) |
162 return log_warn(TAG "unable to fetch project, dropping task"), -1; | 163 return log_warn(TAG "unable to fetch project, dropping task"), -1; |
163 | 164 if (json_unpack(doc, "{ss}", "script", &script) < 0) { |
164 if (task_setup(entry->task, project.script) < 0) | 165 json_decref(doc); |
166 return log_warn(TAG "invalid project JSON object"), -1; | |
167 } | |
168 | |
169 if (task_setup(entry->task, script) < 0) | |
165 log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); | 170 log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); |
166 else if ((pid = task_start(entry->task)) < 0) | 171 else if ((pid = task_start(entry->task)) < 0) |
167 log_warn(TAG "unable to spawn task process: %s", strerror(errno)); | 172 log_warn(TAG "unable to spawn task process: %s", strerror(errno)); |
168 else { | 173 else { |
169 log_info(TAG "task %lld spawned", (long long int)pid); | 174 log_info(TAG "task %lld spawned", (long long int)pid); |
170 ret = 0; | 175 ret = 0; |
171 } | 176 } |
172 | 177 |
173 project_finish(&project); | 178 json_decref(doc); |
174 | 179 |
175 return ret; | 180 return ret; |
176 } | 181 } |
177 | 182 |
178 static inline void | 183 static inline void |
295 } | 300 } |
296 | 301 |
297 static int | 302 static int |
298 publish(struct taskentry *iter) | 303 publish(struct taskentry *iter) |
299 { | 304 { |
300 // TODO: add sigcode. | |
301 struct taskcode code = task_code(iter->task); | |
302 struct jobresult res = { | |
303 .job_id = iter->job.id, | |
304 .exitcode = code.exitcode, | |
305 .log = util_strdup(task_console(iter->task)), | |
306 .worker_name = worker.name | |
307 }; | |
308 struct apic req; | 305 struct apic req; |
306 json_t *obj; | |
309 int ret = 0; | 307 int ret = 0; |
310 | 308 |
311 if (apic_jobresult_add(&req, &res) < 0) { | 309 obj = json_pack("{sI ss ss si si}", |
310 "job_id", iter->job_id, | |
311 "worker_name", sciworkerd.name, | |
312 "console", task_console(iter->task), | |
313 "exitcode", task_code(iter->task).exitcode, | |
314 "sigcode", task_code(iter->task).sigcode | |
315 ); | |
316 | |
317 if (apic_jobresult_add(&req, obj) < 0) { | |
312 log_warn(TAG "unable to publish task: %s", req.error); | 318 log_warn(TAG "unable to publish task: %s", req.error); |
313 ret = -1; | 319 ret = -1; |
314 } | 320 } |
315 | 321 |
316 apic_finish(&req); | 322 json_decref(obj); |
317 jobresult_finish(&res); | |
318 | 323 |
319 return ret; | 324 return ret; |
320 } | 325 } |
321 | 326 |
322 static void | 327 static void |
337 { | 342 { |
338 struct sigaction sa = {0}; | 343 struct sigaction sa = {0}; |
339 | 344 |
340 log_open("sigworkerd"); | 345 log_open("sigworkerd"); |
341 | 346 |
347 if (strlen(sciworkerd.name) == 0) | |
348 log_die(TAG "no worker name defined"); | |
349 | |
342 sigemptyset(&sa.sa_mask); | 350 sigemptyset(&sa.sa_mask); |
343 sa.sa_handler = stop; | 351 sa.sa_handler = stop; |
344 | 352 |
345 util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); | 353 util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); |
346 | 354 |
347 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) | 355 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) |
348 log_die(TAG "sigaction: %s", strerror(errno)); | 356 log_die(TAG "sigaction: %s", strerror(errno)); |
349 | |
350 fetch_worker(); | |
351 } | 357 } |
352 | 358 |
353 void | 359 void |
354 sciworkerd_run(void) | 360 sciworkerd_run(void) |
355 { | 361 { |