Mercurial > sci
comparison sciworkerd/sciworkerd.c @ 23:2cb228f23f53
misc: rework todo/jobs HTTP requests
author | David Demelier <markand@malikania.fr> |
---|---|
date | Thu, 21 Jul 2022 21:55:02 +0200 |
parents | f98ea578b1ef |
children | 34cbbd215ef7 |
comparison
equal
deleted
inserted
replaced
22:dd078aea5d02 | 23:2cb228f23f53 |
---|---|
76 memcpy(&tk->job, job, sizeof (*job)); | 76 memcpy(&tk->job, job, sizeof (*job)); |
77 LL_APPEND(taskpending, tk); | 77 LL_APPEND(taskpending, tk); |
78 } | 78 } |
79 | 79 |
80 static void | 80 static void |
81 merge(const struct job *jobs, size_t jobsz) | 81 merge(struct job *jobs, size_t jobsz) |
82 { | 82 { |
83 size_t total = 0; | 83 size_t total = 0; |
84 | 84 |
85 for (ssize_t i = 0; i < jobsz; ++i) { | 85 for (ssize_t i = 0; i < jobsz; ++i) { |
86 if (!pending(jobs[i].id)) { | 86 if (!pending(jobs[i].id)) { |
87 queue(&jobs[i]); | 87 queue(&jobs[i]); |
88 total++; | 88 total++; |
89 } | 89 } else |
90 job_finish(&jobs[i]); | |
90 } | 91 } |
91 | 92 |
92 log_info(TAG "added %zu new pending tasks", total); | 93 log_info(TAG "added %zu new pending tasks", total); |
93 } | 94 } |
94 | 95 |
102 time_t now; | 103 time_t now; |
103 struct apic req; | 104 struct apic req; |
104 struct job todo[SCI_JOB_LIST_MAX]; | 105 struct job todo[SCI_JOB_LIST_MAX]; |
105 ssize_t todosz; | 106 ssize_t todosz; |
106 | 107 |
107 if (!startup) | |
108 startup = time(NULL); | |
109 | |
110 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { | 108 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { |
111 startup = now; | 109 startup = now; |
112 | 110 |
113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.id)) < 0) | 111 log_info(TAG "fetching jobs"); |
112 | |
113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0) | |
114 log_warn(TAG "unable to fetch jobs: %s", req.error); | 114 log_warn(TAG "unable to fetch jobs: %s", req.error); |
115 else | 115 else |
116 merge(todo, todosz); | 116 merge(todo, todosz); |
117 | 117 |
118 apic_finish(&req); | 118 apic_finish(&req); |
125 static void | 125 static void |
126 fetch_worker(void) | 126 fetch_worker(void) |
127 { | 127 { |
128 struct apic req; | 128 struct apic req; |
129 | 129 |
130 util_strlcpy(&worker.name, sciworkerd.name); | 130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0) |
131 | 131 log_die(TAG "unable to fetch worker info: %s", req.error); |
132 if (apic_worker_find(&req, &worker) < 0) | 132 |
133 log_die(TAG, "unable to fetch worker info: %s", req.error); | |
134 | |
135 log_info("worker id: %d", worker.id); | |
136 log_info("worker name: %s", worker.name); | 133 log_info("worker name: %s", worker.name); |
137 log_info("worker description: %s", worker.desc); | 134 log_info("worker description: %s", worker.desc); |
138 | 135 |
139 apic_finish(&req); | 136 apic_finish(&req); |
140 } | |
141 | |
142 /* | |
143 * Fetch information about a project. | |
144 */ | |
145 static int | |
146 fetch_project(struct project *project, int id) | |
147 { | |
148 struct apic req; | |
149 | |
150 if (apic_project_find_id(&req, project, id) < 0) | |
151 return -1; | |
152 #if 0 | |
153 if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0) | |
154 return log_warn(TAG "unable to fetch project info: %s", req.error), -1; | |
155 if (!req.doc) | |
156 return log_warn(TAG "empty project response"), -1; | |
157 if (project_from(project, 1, req.doc) < 0) | |
158 return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1; | |
159 #endif | |
160 | |
161 return 0; | |
162 } | 137 } |
163 | 138 |
164 static inline size_t | 139 static inline size_t |
165 count(const struct taskentry *head) | 140 count(const struct taskentry *head) |
166 { | 141 { |
178 * script. | 153 * script. |
179 */ | 154 */ |
180 static int | 155 static int |
181 start(struct taskentry *entry) | 156 start(struct taskentry *entry) |
182 { | 157 { |
183 struct apic; | 158 struct apic req; |
184 struct project project; | 159 struct project project; |
185 pid_t pid; | 160 pid_t pid; |
186 | 161 int ret = -1; |
187 if (apic_project_find_id(&project, entry->job.project_id) < 0) | 162 |
163 if (apic_project_find(&req, &project, entry->job.project_name) < 0) | |
188 return log_warn(TAG "unable to fetch project, dropping task"), -1; | 164 return log_warn(TAG "unable to fetch project, dropping task"), -1; |
165 | |
189 if (task_setup(entry->task, project.script) < 0) | 166 if (task_setup(entry->task, project.script) < 0) |
190 return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1; | 167 log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)); |
191 if ((pid = task_start(entry->task)) < 0) | 168 else if ((pid = task_start(entry->task)) < 0) |
192 return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1; | 169 log_warn(TAG "unable to spawn task process: %s", strerror(errno)); |
193 | 170 else { |
194 log_info(TAG "task %lld spawned", (long long int)pid); | 171 log_info(TAG "task %lld spawned", (long long int)pid); |
195 | 172 ret = 0; |
196 return 0; | 173 } |
174 | |
175 project_finish(&project); | |
176 | |
177 return ret; | |
197 } | 178 } |
198 | 179 |
199 static inline void | 180 static inline void |
200 delete(struct taskentry *entry) | 181 delete(struct taskentry *entry) |
201 { | 182 { |
317 // TODO: add sigcode. | 298 // TODO: add sigcode. |
318 struct taskcode code = task_code(iter->task); | 299 struct taskcode code = task_code(iter->task); |
319 struct jobresult res = { | 300 struct jobresult res = { |
320 .job_id = iter->job.id, | 301 .job_id = iter->job.id, |
321 .exitcode = code.exitcode, | 302 .exitcode = code.exitcode, |
322 .log = task_console(iter->task), | 303 .log = util_strdup(task_console(iter->task)), |
323 .worker_id = worker.id | 304 .worker_name = worker.name |
324 }; | 305 }; |
325 struct apicreq req; | 306 struct apic req; |
326 json_t *doc; | 307 json_t *doc; |
327 int ret; | 308 int ret; |
328 | 309 |
329 doc = jobresult_to(&res, 1); | 310 doc = jobresult_to(&res, 1); |
330 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); | 311 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); |
332 | 313 |
333 if (ret) | 314 if (ret) |
334 log_warn(TAG "unable to publish task: %s", req.error); | 315 log_warn(TAG "unable to publish task: %s", req.error); |
335 else | 316 else |
336 log_info(TAG "task successfully published"); | 317 log_info(TAG "task successfully published"); |
318 | |
319 jobresult_finish(&res); | |
320 apic_finish(&req); | |
337 | 321 |
338 return ret; | 322 return ret; |
339 } | 323 } |
340 | 324 |
341 static void | 325 static void |
358 | 342 |
359 log_open("sigworkerd"); | 343 log_open("sigworkerd"); |
360 | 344 |
361 sigemptyset(&sa.sa_mask); | 345 sigemptyset(&sa.sa_mask); |
362 sa.sa_handler = stop; | 346 sa.sa_handler = stop; |
347 | |
348 util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl)); | |
363 | 349 |
364 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) | 350 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) |
365 log_die(TAG "sigaction: %s", strerror(errno)); | 351 log_die(TAG "sigaction: %s", strerror(errno)); |
366 | 352 |
367 fetch_worker(); | 353 fetch_worker(); |