1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2017 Joyent, Inc.
14 */
15
16 #include <pthread.h>
17 #include <umem.h>
18 #include <err.h>
19 #include <sys/debug.h>
20 #include <bunyan.h>
21 #include <time.h>
22 #include "defs.h"
23 #include "worker.h"
24 #include "pkt.h"
25 #include "timer.h"
26 #include "pkcs11.h"
27 #include "ikev2_proto.h"
28
29 typedef enum worker_cmd {
30 WC_NONE,
31 WC_SUSPEND,
32 WC_QUIT
33 } worker_cmd_t;
34
35 typedef struct worker_item {
36 worker_evt_t wi_event;
37 void *wi_data;
38 } worker_item_t;
39
40 typedef struct worker_queue {
41 pthread_mutex_t wq_lock;
42 pthread_cond_t wq_cv;
43 worker_cmd_t wq_cmd;
44 worker_item_t *wq_items;
45 size_t wq_start;
46 size_t wq_end;
47 } worker_queue_t;
48 #define WQ_EMPTY(_wq) ((_wq)->wq_start == (_wq)->wq_end)
49 #define WQ_FULL(_wq) ((((_wq)->wq_end + 1) % queuelen) == (_wq)->wq_start)
50
51 typedef struct worker {
52 pthread_t w_tid;
53 bunyan_logger_t *w_log;
54 worker_queue_t w_queue;
55 boolean_t w_done;
56 } worker_t;
57
58 static pthread_rwlock_t worker_lock = PTHREAD_RWLOCK_INITIALIZER;
59
60 size_t nworkers;
61 static worker_t **workers;
62 static size_t workers_alloc;
63 static size_t queuelen;
64
65 static volatile uint_t nsuspended;
66 static pthread_mutex_t suspend_lock = PTHREAD_MUTEX_INITIALIZER;
67 static pthread_cond_t suspend_cv = PTHREAD_COND_INITIALIZER;
68
69 static worker_t *worker_init_one(size_t);
70 static void *worker(void *);
71 static const char *worker_cmd_str(worker_cmd_t);
72 static const char *worker_evt_str(worker_evt_t);
73 static void worker_pkt_inbound(pkt_t *);
74
75 void
76 worker_init(size_t n_workers, size_t queue_sz)
77 {
78 size_t len;
79
80 /* to have a gcc with overflow checks... */
81 len = n_workers * sizeof (worker_t *);
82 VERIFY3U(len, >, n_workers);
83 VERIFY3U(len, >, sizeof (worker_t *));
84
85 workers = umem_zalloc(len, UMEM_DEFAULT);
86 if (workers == NULL)
87 err(EXIT_FAILURE, "out of memory");
88
89 nworkers = workers_alloc = n_workers;
90
91 len = queue_sz * sizeof (worker_item_t *);
92 VERIFY3U(len, >, queue_sz);
93 VERIFY3U(len, >, sizeof (worker_item_t *));
94 queuelen = queue_sz;
95
96 for (size_t i = 0; i < nworkers; i++) {
97 worker_t *w = worker_init_one(len);
98
99 if (w == NULL)
100 err(EXIT_FAILURE, "out of memory");
101
102 bunyan_key_add(w->w_log, BUNYAN_T_UINT32, "worker",
103 (uint32_t)i);
104
105 workers[i] = w;
106 }
107
108 for (size_t i = 0; i < nworkers; i++) {
109 worker_t *w = workers[i];
110 PTH(pthread_create(&w->w_tid, NULL, worker, w));
111 }
112 }
113
114 static worker_t *
115 worker_init_one(size_t len)
116 {
117 worker_t *w = NULL;
118
119 if ((w = umem_zalloc(sizeof (*w), UMEM_DEFAULT)) == NULL)
120 return (NULL);
121
122 if ((w->w_queue.wq_items = umem_zalloc(len, UMEM_DEFAULT)) == NULL) {
123 umem_free(w, sizeof (*w));
124 return (NULL);
125 }
126
127 if (bunyan_child(log, &w->w_log, BUNYAN_T_END) != 0)
128 return (NULL);
129
130 PTH(pthread_mutex_init(&w->w_queue.wq_lock, NULL));
131 PTH(pthread_cond_init(&w->w_queue.wq_cv, NULL));
132
133 return (w);
134 }
135
136 static boolean_t
137 worker_send_cmd(size_t n, worker_cmd_t cmd)
138 {
139 ASSERT3U(n, <=, nworkers);
140 ASSERT(RW_LOCK_HELD(&worker_lock));
141
142 worker_t *w = workers[n];
143 worker_queue_t *wq = &w->w_queue;
144
145 PTH(pthread_mutex_lock(&wq->wq_lock));
146 if (w->w_queue.wq_cmd != WC_NONE) {
147 PTH(pthread_mutex_unlock(&wq->wq_lock));
148 return (B_FALSE);
149 }
150
151 w->w_queue.wq_cmd = cmd;
152 PTH(pthread_cond_signal(&wq->wq_cv));
153 PTH(pthread_mutex_unlock(&wq->wq_lock));
154
155 return (B_TRUE);
156 }
157
158 void
159 worker_suspend(void)
160 {
161 PTH(pthread_rwlock_wrlock(&worker_lock));
162 for (size_t i = 0; i < nworkers; i++) {
163 worker_t *w = workers[i];
164 worker_queue_t *wq = &w->w_queue;
165 PTH(pthread_mutex_lock(&wq->wq_lock));
166 w->w_queue.wq_cmd = WC_SUSPEND;
167 PTH(pthread_cond_signal(&wq->wq_cv));
168 PTH(pthread_mutex_unlock(&wq->wq_lock));
169 }
170 PTH(pthread_rwlock_unlock(&worker_lock));
171
172 PTH(pthread_mutex_lock(&suspend_lock));
173 while (nsuspended != nworkers)
174 PTH(pthread_cond_wait(&suspend_cv, &suspend_lock));
175 PTH(pthread_mutex_unlock(&suspend_lock));
176 }
177
178 static void
179 worker_do_suspend(worker_queue_t *wq)
180 {
181 PTH(pthread_mutex_lock(&suspend_lock));
182 if (++nsuspended == nworkers)
183 PTH(pthread_cond_signal(&suspend_cv));
184 PTH(pthread_mutex_unlock(&suspend_lock));
185
186 PTH(pthread_mutex_lock(&wq->wq_lock));
187 while (wq->wq_cmd == WC_SUSPEND)
188 PTH(pthread_cond_wait(&wq->wq_cv, &wq->wq_lock));
189
190 /* leave wq->wq_lock locked */
191 }
192
193 void
194 worker_resume(void)
195 {
196 PTH(pthread_rwlock_wrlock(&worker_lock));
197 for (size_t i = 0; i < nworkers; i++) {
198 worker_t *w = workers[i];
199 worker_queue_t *wq = &w->w_queue;
200 PTH(pthread_mutex_lock(&wq->wq_lock));
201 wq->wq_cmd = WC_NONE;
202 PTH(pthread_mutex_unlock(&wq->wq_lock));
203 PTH(pthread_cond_broadcast(&wq->wq_cv));
204 }
205 PTH(pthread_rwlock_unlock(&worker_lock));
206 }
207
208 boolean_t
209 worker_dispatch(worker_evt_t event, void *data, size_t n)
210 {
211 worker_t *w = NULL;
212 worker_queue_t *wq = NULL;
213 worker_item_t *wi = NULL;
214
215 PTH(pthread_rwlock_rdlock(&worker_lock));
216 VERIFY3U(n, <, nworkers);
217 w = workers[n];
218 wq = &w->w_queue;
219 PTH(pthread_mutex_lock(&wq->wq_lock));
220
221 if (WQ_FULL(wq)) {
222 PTH(pthread_mutex_unlock(&wq->wq_lock));
223 PTH(pthread_rwlock_unlock(&worker_lock));
224
225 (void) bunyan_debug(log, "dispatch failed (queue full)",
226 BUNYAN_T_UINT32, "worker", (uint32_t)n,
227 BUNYAN_T_STRING, "event", worker_evt_str(event),
228 BUNYAN_T_POINTER, "data", data,
229 BUNYAN_T_END);
230 return (B_FALSE);
231 }
232
233 wi = &wq->wq_items[wq->wq_end++];
234 wi->wi_event = event;
235 wi->wi_data = data;
236 wq->wq_end %= queuelen;
237
238 PTH(pthread_cond_signal(&wq->wq_cv));
239 PTH(pthread_mutex_unlock(&wq->wq_lock));
240 PTH(pthread_rwlock_unlock(&worker_lock));
241
242 (void) bunyan_debug(w->w_log, "Dispatching packet to worker",
243 BUNYAN_T_UINT32, "worker", (uint32_t)n,
244 BUNYAN_T_STRING, "event", worker_evt_str(event),
245 BUNYAN_T_POINTER, "data", data,
246 BUNYAN_T_END);
247 return (B_TRUE);
248 }
249
250 static void *
251 worker(void *arg)
252 {
253 worker_t *w = arg;
254 worker_queue_t *wq = &w->w_queue;
255 timespec_t ts = { 0 };
256 boolean_t done = B_FALSE;
257
258 ike_timer_thread_init();
259
260 (void) bunyan_trace(w->w_log, "Worker starting", BUNYAN_T_END);
261
262 PTH(pthread_mutex_lock(&wq->wq_lock));
263
264 /*CONSTCOND*/
265 while (1) {
266 process_timer(&ts, w->w_log);
267
268 if (ts.tv_sec == 0 && ts.tv_nsec == 0) {
269 PTH(pthread_cond_wait(&wq->wq_cv, &wq->wq_lock));
270 } else {
271 int rc;
272 rc = pthread_cond_timedwait(&wq->wq_cv, &wq->wq_lock,
273 &ts);
274 VERIFY(rc == 0 || rc == ETIMEDOUT);
275 }
276
277 if (wq->wq_cmd != WC_NONE)
278 (void) bunyan_info(w->w_log, "Received command",
279 BUNYAN_T_STRING, "cmd", worker_cmd_str(wq->wq_cmd),
280 BUNYAN_T_UINT32, "cmdval", (uint32_t)wq->wq_cmd,
281 BUNYAN_T_END);
282
283 switch (wq->wq_cmd) {
284 case WC_NONE:
285 break;
286 case WC_SUSPEND:
287 worker_do_suspend(wq);
288 ASSERT(MUTEX_HELD(&wq->wq_lock));
289 continue;
290 case WC_QUIT:
291 done = B_TRUE;
292 break;
293 default:
294 INVALID("wq->wq_cmd");
295 }
296
297 if (done)
298 break;
299
300 while (!WQ_EMPTY(wq)) {
301 worker_item_t wi = wq->wq_items[wq->wq_start];
302
303 wq->wq_items[wq->wq_start].wi_event = EVT_NONE;
304 wq->wq_items[wq->wq_start].wi_data = NULL;
305
306 wq->wq_start++;
307 wq->wq_start %= queuelen;
308 PTH(pthread_mutex_unlock(&wq->wq_lock));
309
310 switch (wi.wi_event) {
311 case EVT_NONE:
312 INVALID("wi.wi_event");
313 break;
314 case EVT_PACKET:
315 worker_pkt_inbound(wi.wi_data);
316 break;
317 case EVT_PFKEY:
318 /* TODO */
319 break;
320 }
321
322 PTH(pthread_mutex_lock(&wq->wq_lock));
323 }
324 }
325
326 w->w_done;
327 PTH(pthread_cond_signal(&wq->wq_cv));
328 PTH(pthread_mutex_unlock(&wq->wq_lock));
329 return (w);
330 }
331
332 static void
333 worker_pkt_inbound(pkt_t *pkt)
334 {
335 switch (IKE_GET_MAJORV(pkt->pkt_header.version)) {
336 case 1:
337 /* XXX: ikev1_inbound(pkt); */
338 break;
339 case 2:
340 ikev2_inbound(pkt);
341 break;
342 default:
343 /* XXX: log? */
344 pkt_free(pkt);
345 }
346 }
347
348 boolean_t
349 worker_add(void)
350 {
351 worker_t **new_workers = NULL;
352 worker_t *w = NULL;
353 size_t new_workers_alloc = 0;
354 size_t len = 0, qlen = 0;
355
356 (void) bunyan_trace(log, "Creating new worker", BUNYAN_T_END);
357
358 if (workers_alloc == nworkers) {
359 new_workers_alloc = workers_alloc + 1;
360 len = new_workers_alloc * sizeof (worker_t *);
361 VERIFY3U(len, >, new_workers_alloc);
362 VERIFY3U(len, >, sizeof (worker_t *));
363
364 if ((new_workers = umem_zalloc(len, UMEM_DEFAULT)) == NULL)
365 return (B_FALSE);
366
367 } else {
368 new_workers = workers;
369 new_workers_alloc = workers_alloc;
370 }
371
372 qlen = queuelen * sizeof (pkt_t *);
373 VERIFY3U(qlen, >, queuelen);
374 VERIFY3U(qlen, >, sizeof (pkt_t *));
375
376 if ((w = worker_init_one(qlen)) == NULL) {
377 umem_free(new_workers, len);
378 return (B_FALSE);
379 }
380
381 PTH(pthread_create(&w->w_tid, NULL, worker, w));
382
383 PTH(pthread_rwlock_wrlock(&worker_lock));
384 workers = new_workers;
385 workers_alloc = new_workers_alloc;
386
387 VERIFY3U(workers_alloc, >, nworkers);
388 workers[nworkers++] = w;
389
390 PTH(pthread_rwlock_unlock(&worker_lock));
391
392 (void) bunyan_debug(w->w_log, "Worker created", BUNYAN_T_END);
393
394 return (B_TRUE);
395 }
396
397 void
398 worker_del(void)
399 {
400 }
401
402 #define STR(x) case x: return (#x)
403 static const char *
404 worker_cmd_str(worker_cmd_t wc)
405 {
406 switch (wc) {
407 STR(WC_NONE);
408 STR(WC_SUSPEND);
409 STR(WC_QUIT);
410 }
411 return ("UNKNOWN");
412 }
413
414 static const char *
415 worker_evt_str(worker_evt_t evt)
416 {
417 switch (evt) {
418 STR(EVT_NONE);
419 STR(EVT_PACKET);
420 STR(EVT_PFKEY);
421 }
422 return ("UNKNOWN");
423 }