comparison sciworkerd/sciworkerd.c @ 20:f98ea578b1ef

misc: revamp database
author David Demelier <markand@malikania.fr>
date Tue, 19 Jul 2022 21:52:42 +0200
parents de4bf839b565
children 2cb228f23f53
comparison
equal deleted inserted replaced
19:de4bf839b565 20:f98ea578b1ef
76 memcpy(&tk->job, job, sizeof (*job)); 76 memcpy(&tk->job, job, sizeof (*job));
77 LL_APPEND(taskpending, tk); 77 LL_APPEND(taskpending, tk);
78 } 78 }
79 79
80 static void 80 static void
81 merge(json_t *doc) 81 merge(const struct job *jobs, size_t jobsz)
82 { 82 {
83 struct job jobs[SCI_JOB_LIST_MAX];
84 ssize_t jobsz;
85 size_t total = 0; 83 size_t total = 0;
86 84
87 if ((jobsz = job_from(jobs, UTIL_SIZE(jobs), doc)) < 0) 85 for (ssize_t i = 0; i < jobsz; ++i) {
88 log_warn(TAG "error while parsing jobs: %s", strerror(errno)); 86 if (!pending(jobs[i].id)) {
89 else { 87 queue(&jobs[i]);
90 for (ssize_t i = 0; i < jobsz; ++i) { 88 total++;
91 if (!pending(jobs[i].id)) { 89 }
92 queue(&jobs[i]); 90 }
93 total++; 91
94 } 92 log_info(TAG "added %zu new pending tasks", total);
95 }
96
97 log_info(TAG "added %zu new pending tasks", total);
98 }
99 } 93 }
100 94
101 /* 95 /*
102 * Fetch jobs periodically, depending on the user setting. 96 * Fetch jobs periodically, depending on the user setting.
103 */ 97 */
104 static void 98 static void
105 fetch_jobs(void) 99 fetch_jobs(void)
106 { 100 {
107 static time_t startup; 101 static time_t startup;
108 time_t now; 102 time_t now;
109 struct apicreq req; 103 struct apic req;
104 struct job todo[SCI_JOB_LIST_MAX];
105 ssize_t todosz;
110 106
111 if (!startup) 107 if (!startup)
112 startup = time(NULL); 108 startup = time(NULL);
113 109
114 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) { 110 if (difftime((now = time(NULL)), startup) >= sciworkerd.fetchinterval) {
115 startup = now; 111 startup = now;
116 112
117 if (apic_get(&req, "%s/api/v1/%s", sciworkerd.url, sciworkerd.name) < 0) 113 if ((todosz = apic_job_todo(&req, todo, UTIL_SIZE(todo), worker.id)) < 0)
118 log_warn(TAG "unable to fetch jobs: %s", req.error); 114 log_warn(TAG "unable to fetch jobs: %s", req.error);
119 if (req.doc) { 115 else
120 merge(req.doc); 116 merge(todo, todosz);
121 json_decref(req.doc); 117
122 } 118 apic_finish(&req);
123 } 119 }
124 } 120 }
125 121
126 /* 122 /*
127 * Fetch information about myself. 123 * Fetch information about myself.
128 */ 124 */
129 static void 125 static void
130 fetch_worker(void) 126 fetch_worker(void)
131 { 127 {
132 struct apicreq req; 128 struct apic req;
133 129
134 if (apic_get(&req, "%s/api/v1/workers/%s", sciworkerd.url, sciworkerd.name) < 0) 130 util_strlcpy(&worker.name, sciworkerd.name);
135 log_warn(TAG "unable to fetch worker info: %s", req.error); 131
136 if (!req.doc) 132 if (apic_worker_find(&req, &worker) < 0)
137 log_die(TAG "empty worker response"); 133 log_die(TAG, "unable to fetch worker info: %s", req.error);
138 if (worker_from(&worker, 1, req.doc) < 0)
139 log_die(TAG "unable to parse worker", strerror(errno));
140 134
141 log_info("worker id: %d", worker.id); 135 log_info("worker id: %d", worker.id);
142 log_info("worker name: %s", worker.name); 136 log_info("worker name: %s", worker.name);
143 log_info("worker description: %s", worker.desc); 137 log_info("worker description: %s", worker.desc);
144 138
145 json_decref(req.doc); 139 apic_finish(&req);
146 } 140 }
147 141
148 /* 142 /*
149 * Fetch information about a project. 143 * Fetch information about a project.
150 */ 144 */
151 static int 145 static int
152 fetch_project(struct project *project, int id) 146 fetch_project(struct project *project, int id)
153 { 147 {
154 struct apicreq req; 148 struct apic req;
155 149
150 if (apic_project_find_id(&req, project, id) < 0)
151 return -1;
152 #if 0
156 if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0) 153 if (apic_get(&req, "%s/api/v1/projects/%d", id) < 0)
157 return log_warn(TAG "unable to fetch project info: %s", req.error), -1; 154 return log_warn(TAG "unable to fetch project info: %s", req.error), -1;
158 if (!req.doc) 155 if (!req.doc)
159 return log_warn(TAG "empty project response"), -1; 156 return log_warn(TAG "empty project response"), -1;
160 if (project_from(project, 1, req.doc) < 0) 157 if (project_from(project, 1, req.doc) < 0)
161 return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1; 158 return log_warn(TAG "unable to parse project: %s", strerror(errno)), -1;
159 #endif
162 160
163 return 0; 161 return 0;
164 } 162 }
165 163
166 static inline size_t 164 static inline size_t
180 * script. 178 * script.
181 */ 179 */
182 static int 180 static int
183 start(struct taskentry *entry) 181 start(struct taskentry *entry)
184 { 182 {
183 struct apic;
185 struct project project; 184 struct project project;
186 pid_t pid; 185 pid_t pid;
187 186
188 if (fetch_project(&project, entry->job.project_id) < 0) 187 if (apic_project_find_id(&project, entry->job.project_id) < 0)
189 return log_warn(TAG "unable to fetch project, dropping task"), -1; 188 return log_warn(TAG "unable to fetch project, dropping task"), -1;
190 if (task_setup(entry->task, project.script) < 0) 189 if (task_setup(entry->task, project.script) < 0)
191 return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1; 190 return log_warn(TAG "unable to setup script code: %s, dropping task", strerror(errno)), -1;
192 if ((pid = task_start(entry->task)) < 0) 191 if ((pid = task_start(entry->task)) < 0)
193 return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1; 192 return log_warn(TAG "unable to spawn task process: %s", strerror(errno)), -1;