comparison sciworkerd/sciworkerd.c @ 23:2cb228f23f53

misc: rework todo/jobs HTTP requests
author David Demelier <markand@malikania.fr>
date Thu, 21 Jul 2022 21:55:02 +0200
parents f98ea578b1ef
children 34cbbd215ef7
comparison
equal deleted inserted replaced
22:dd078aea5d02 23:2cb228f23f53
76 memcpy(&tk->job, job, sizeof (*job)); 76 memcpy(&tk->job, job, sizeof (*job));
77 LL_APPEND(taskpending, tk); 77 LL_APPEND(taskpending, tk);
78 } 78 }
79 79
80 static void 80 static void
81 merge(const struct job *jobs, size_t jobsz) 81 merge(struct job *jobs, size_t jobsz)
82 { 82 {
83 size_t total = 0; 83 size_t total = 0;
84 84
85 for (ssize_t i = 0; i < jobsz; ++i) { 85 for (ssize_t i = 0; i < jobsz; ++i) {
86 if (!pending(jobs[i].id)) { 86 if (!pending(jobs[i].id)) {
87 queue(&jobs[i]); 87 queue(&jobs[i]);
88 total++; 88 total++;
89 } 89 } else
90 job_finish(&jobs[i]);
90 } 91 }
91 92
92 log_info(TAG "added %zu new pending tasks", total); 93 log_info(TAG "added %zu new pending tasks", total);
93 } 94 }
94 95
102 time_t now; 103 time_t now;
103 struct apic req; 104 struct apic req;
104 struct job todo[SCI_JOB_LIST_MAX]; 105 struct job todo[SCI_JOB_LIST_MAX];
105 ssize_t todosz; 106 ssize_t todosz;
106 107
107 if (!startup)
108 startup = time(NULL);
109
110 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { 108 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
111 startup = now; 109 startup = now;
112 110
113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.id)) < 0) 111 log_info(TAG "fetching jobs");
112
113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.name)) < 0)
114 log_warn(TAG "unable to fetch jobs: %s", req.error); 114 log_warn(TAG "unable to fetch jobs: %s", req.error);
115 else 115 else
116 merge(todo, todosz); 116 merge(todo, todosz);
117 117
118 apic_finish(&req); 118 apic_finish(&req);
125 static void 125 static void
126 fetch_worker(void) 126 fetch_worker(void)
127 { 127 {
128 struct apic req; 128 struct apic req;
129 129
130 util_strlcpy(&worker.name, sciworkerd.name); 130 if (apic_worker_find(&req, &worker, sciworkerd.name) < 0)
131 131 log_die(TAG "unable to fetch worker info: %s", req.error);
132 if (apic_worker_find(&req, &worker) < 0) 132
133 log_die(TAG, "unable to fetch worker info: %s", req.error);
134
135 log_info("worker id: %d", worker.id);
136 log_info("worker name: %s", worker.name); 133 log_info("worker name: %s", worker.name);
137 log_info("worker description: %s", worker.desc); 134 log_info("worker description: %s", worker.desc);
138 135
139 apic_finish(&req); 136 apic_finish(&req);
140 }
141
142 /*
143 * Fetch information about a project.
144 */
145 static int
146 fetch_project(struct project *project, int id)
147 {
148 struct apic req;
149
150 if (apic_project_find_id(&req, project, id) < 0)
151 return -1;
152 #if 0
153 if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0)
154 return log_warn(TAG "unable to fetch project info: %s", req.error), -1;
155 if (!req.doc)
156 return log_warn(TAG "empty project response"), -1;
157 if (project_from(project, 1, req.doc) < 0)
158 return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1;
159 #endif
160
161 return 0;
162 } 137 }
163 138
164 static inline size_t 139 static inline size_t
165 count(const struct taskentry *head) 140 count(const struct taskentry *head)
166 { 141 {
178 * script. 153 * script.
179 */ 154 */
180 static int 155 static int
181 start(struct taskentry *entry) 156 start(struct taskentry *entry)
182 { 157 {
183 struct apic; 158 struct apic req;
184 struct project project; 159 struct project project;
185 pid_t pid; 160 pid_t pid;
186 161 int ret = -1;
187 if (apic_project_find_id(&project, entry->job.project_id) < 0) 162
163 if (apic_project_find(&req, &project, entry->job.project_name) < 0)
188 return log_warn(TAG "unable to fetch project, dropping task"), -1; 164 return log_warn(TAG "unable to fetch project, dropping task"), -1;
165
189 if (task_setup(entry->task, project.script) < 0) 166 if (task_setup(entry->task, project.script) < 0)
190 return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1; 167 log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno));
191 if ((pid = task_start(entry->task)) < 0) 168 else if ((pid = task_start(entry->task)) < 0)
192 return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1; 169 log_warn(TAG "unable to spawn task process: %s", strerror(errno));
193 170 else {
194 log_info(TAG "task %lld spawned", (long long int)pid); 171 log_info(TAG "task %lld spawned", (long long int)pid);
195 172 ret = 0;
196 return 0; 173 }
174
175 project_finish(&project);
176
177 return ret;
197 } 178 }
198 179
199 static inline void 180 static inline void
200 delete(struct taskentry *entry) 181 delete(struct taskentry *entry)
201 { 182 {
317 // TODO: add sigcode. 298 // TODO: add sigcode.
318 struct taskcode code = task_code(iter->task); 299 struct taskcode code = task_code(iter->task);
319 struct jobresult res = { 300 struct jobresult res = {
320 .job_id = iter->job.id, 301 .job_id = iter->job.id,
321 .exitcode = code.exitcode, 302 .exitcode = code.exitcode,
322 .log = task_console(iter->task), 303 .log = util_strdup(task_console(iter->task)),
323 .worker_id = worker.id 304 .worker_name = worker.name
324 }; 305 };
325 struct apicreq req; 306 struct apic req;
326 json_t *doc; 307 json_t *doc;
327 int ret; 308 int ret;
328 309
329 doc = jobresult_to(&res, 1); 310 doc = jobresult_to(&res, 1);
330 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url); 311 ret = apic_post(&req, doc, "%s/api/v1/jobs", sciworkerd.url);
332 313
333 if (ret) 314 if (ret)
334 log_warn(TAG "unable to publish task: %s", req.error); 315 log_warn(TAG "unable to publish task: %s", req.error);
335 else 316 else
336 log_info(TAG "task successfully published"); 317 log_info(TAG "task successfully published");
318
319 jobresult_finish(&res);
320 apic_finish(&req);
337 321
338 return ret; 322 return ret;
339 } 323 }
340 324
341 static void 325 static void
358 342
359 log_open("sigworkerd"); 343 log_open("sigworkerd");
360 344
361 sigemptyset(&sa.sa_mask); 345 sigemptyset(&sa.sa_mask);
362 sa.sa_handler = stop; 346 sa.sa_handler = stop;
347
348 util_strlcpy(apiconf.baseurl, sciworkerd.url, sizeof (apiconf.baseurl));
363 349
364 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0) 350 if (sigaction(SIGINT, &sa, NULL) < 0 || sigaction(SIGTERM, &sa, NULL) < 0)
365 log_die(TAG "sigaction: %s", strerror(errno)); 351 log_die(TAG "sigaction: %s", strerror(errno));
366 352
367 fetch_worker(); 353 fetch_worker();