1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2017 Joyent, Inc.
  14  */
  15 
  16 #include <pthread.h>
  17 #include <umem.h>
  18 #include <err.h>
  19 #include <sys/debug.h>
  20 #include <bunyan.h>
  21 #include <time.h>
  22 #include "defs.h"
  23 #include "worker.h"
  24 #include "pkt.h"
  25 #include "timer.h"
  26 #include "pkcs11.h"
  27 #include "ikev2_proto.h"
  28 
  29 typedef enum worker_cmd {
  30         WC_NONE,
  31         WC_SUSPEND,
  32         WC_QUIT
  33 } worker_cmd_t;
  34 
  35 typedef struct worker_item {
  36         worker_evt_t    wi_event;
  37         void            *wi_data;
  38 } worker_item_t;
  39 
  40 typedef struct worker_queue {
  41         pthread_mutex_t wq_lock;
  42         pthread_cond_t  wq_cv;
  43         worker_cmd_t    wq_cmd;
  44         worker_item_t   *wq_items;
  45         size_t          wq_start;
  46         size_t          wq_end;
  47 } worker_queue_t;
  48 #define WQ_EMPTY(_wq) ((_wq)->wq_start == (_wq)->wq_end)
  49 #define WQ_FULL(_wq) ((((_wq)->wq_end + 1) % queuelen) == (_wq)->wq_start)
  50 
  51 typedef struct worker {
  52         pthread_t       w_tid;
  53         bunyan_logger_t *w_log;
  54         worker_queue_t  w_queue;
  55         boolean_t       w_done;
  56 } worker_t;
  57 
  58 static pthread_rwlock_t worker_lock = PTHREAD_RWLOCK_INITIALIZER;
  59 
  60 size_t  nworkers;
  61 static worker_t **workers;
  62 static size_t   workers_alloc;
  63 static size_t   queuelen;
  64 
  65 static volatile uint_t nsuspended;
  66 static pthread_mutex_t suspend_lock = PTHREAD_MUTEX_INITIALIZER;
  67 static pthread_cond_t suspend_cv = PTHREAD_COND_INITIALIZER;
  68 
  69 static worker_t *worker_init_one(size_t);
  70 static void *worker(void *);
  71 static const char *worker_cmd_str(worker_cmd_t);
  72 static const char *worker_evt_str(worker_evt_t);
  73 static void worker_pkt_inbound(pkt_t *);
  74 
  75 void
  76 worker_init(size_t n_workers, size_t queue_sz)
  77 {
  78         size_t len;
  79 
  80         /* to have a gcc with overflow checks... */
  81         len = n_workers * sizeof (worker_t *);
  82         VERIFY3U(len, >, n_workers);
  83         VERIFY3U(len, >, sizeof (worker_t *));
  84 
  85         workers = umem_zalloc(len, UMEM_DEFAULT);
  86         if (workers == NULL)
  87                 err(EXIT_FAILURE, "out of memory");
  88 
  89         nworkers = workers_alloc = n_workers;
  90 
  91         len = queue_sz * sizeof (worker_item_t *);
  92         VERIFY3U(len, >, queue_sz);
  93         VERIFY3U(len, >, sizeof (worker_item_t *));
  94         queuelen = queue_sz;
  95 
  96         for (size_t i = 0; i < nworkers; i++) {
  97                 worker_t *w = worker_init_one(len);
  98 
  99                 if (w == NULL)
 100                         err(EXIT_FAILURE, "out of memory");
 101 
 102                 bunyan_key_add(w->w_log, BUNYAN_T_UINT32, "worker",
 103                     (uint32_t)i);
 104 
 105                 workers[i] = w;
 106         }
 107 
 108         for (size_t i = 0; i < nworkers; i++) {
 109                 worker_t *w = workers[i];
 110                 PTH(pthread_create(&w->w_tid, NULL, worker, w));
 111         }
 112 }
 113 
 114 static worker_t *
 115 worker_init_one(size_t len)
 116 {
 117         worker_t *w = NULL;
 118 
 119         if ((w = umem_zalloc(sizeof (*w), UMEM_DEFAULT)) == NULL)
 120                 return (NULL);
 121 
 122         if ((w->w_queue.wq_items = umem_zalloc(len, UMEM_DEFAULT)) == NULL) {
 123                 umem_free(w, sizeof (*w));
 124                 return (NULL);
 125         }
 126 
 127         if (bunyan_child(log, &w->w_log, BUNYAN_T_END) != 0)
 128                 return (NULL);
 129 
 130         PTH(pthread_mutex_init(&w->w_queue.wq_lock, NULL));
 131         PTH(pthread_cond_init(&w->w_queue.wq_cv, NULL));
 132 
 133         return (w);
 134 }
 135 
 136 static boolean_t
 137 worker_send_cmd(size_t n, worker_cmd_t cmd)
 138 {
 139         ASSERT3U(n, <=, nworkers);
 140         ASSERT(RW_LOCK_HELD(&worker_lock));
 141 
 142         worker_t *w = workers[n];
 143         worker_queue_t *wq = &w->w_queue;
 144 
 145         PTH(pthread_mutex_lock(&wq->wq_lock));
 146         if (w->w_queue.wq_cmd != WC_NONE) {
 147                 PTH(pthread_mutex_unlock(&wq->wq_lock));
 148                 return (B_FALSE);
 149         }
 150 
 151         w->w_queue.wq_cmd = cmd;
 152         PTH(pthread_cond_signal(&wq->wq_cv));
 153         PTH(pthread_mutex_unlock(&wq->wq_lock));
 154 
 155         return (B_TRUE);
 156 }
 157 
 158 void
 159 worker_suspend(void)
 160 {
 161         PTH(pthread_rwlock_wrlock(&worker_lock));
 162         for (size_t i = 0; i < nworkers; i++) {
 163                 worker_t *w = workers[i];
 164                 worker_queue_t *wq = &w->w_queue;
 165                 PTH(pthread_mutex_lock(&wq->wq_lock));
 166                 w->w_queue.wq_cmd = WC_SUSPEND;
 167                 PTH(pthread_cond_signal(&wq->wq_cv));
 168                 PTH(pthread_mutex_unlock(&wq->wq_lock));
 169         }
 170         PTH(pthread_rwlock_unlock(&worker_lock));
 171 
 172         PTH(pthread_mutex_lock(&suspend_lock));
 173         while (nsuspended != nworkers)
 174                 PTH(pthread_cond_wait(&suspend_cv, &suspend_lock));
 175         PTH(pthread_mutex_unlock(&suspend_lock));
 176 }
 177 
 178 static void
 179 worker_do_suspend(worker_queue_t *wq)
 180 {
 181         PTH(pthread_mutex_lock(&suspend_lock));
 182         if (++nsuspended == nworkers)
 183                 PTH(pthread_cond_signal(&suspend_cv));
 184         PTH(pthread_mutex_unlock(&suspend_lock));
 185 
 186         PTH(pthread_mutex_lock(&wq->wq_lock));
 187         while (wq->wq_cmd == WC_SUSPEND)
 188                 PTH(pthread_cond_wait(&wq->wq_cv, &wq->wq_lock));
 189 
 190         /* leave wq->wq_lock locked */
 191 }
 192 
 193 void
 194 worker_resume(void)
 195 {
 196         PTH(pthread_rwlock_wrlock(&worker_lock));
 197         for (size_t i = 0; i < nworkers; i++) {
 198                 worker_t *w = workers[i];
 199                 worker_queue_t *wq = &w->w_queue;
 200                 PTH(pthread_mutex_lock(&wq->wq_lock));
 201                 wq->wq_cmd = WC_NONE;
 202                 PTH(pthread_mutex_unlock(&wq->wq_lock));
 203                 PTH(pthread_cond_broadcast(&wq->wq_cv));
 204         }
 205         PTH(pthread_rwlock_unlock(&worker_lock));
 206 }
 207 
 208 boolean_t
 209 worker_dispatch(worker_evt_t event, void *data, size_t n)
 210 {
 211         worker_t *w = NULL;
 212         worker_queue_t *wq = NULL;
 213         worker_item_t *wi = NULL;
 214 
 215         PTH(pthread_rwlock_rdlock(&worker_lock));
 216         VERIFY3U(n, <, nworkers);
 217         w = workers[n];
 218         wq = &w->w_queue;
 219         PTH(pthread_mutex_lock(&wq->wq_lock));
 220 
 221         if (WQ_FULL(wq)) {
 222                 PTH(pthread_mutex_unlock(&wq->wq_lock));
 223                 PTH(pthread_rwlock_unlock(&worker_lock));
 224 
 225                 (void) bunyan_debug(log, "dispatch failed (queue full)",
 226                     BUNYAN_T_UINT32, "worker", (uint32_t)n,
 227                     BUNYAN_T_STRING, "event", worker_evt_str(event),
 228                     BUNYAN_T_POINTER, "data", data,
 229                     BUNYAN_T_END);
 230                 return (B_FALSE);
 231         }
 232 
 233         wi = &wq->wq_items[wq->wq_end++];
 234         wi->wi_event = event;
 235         wi->wi_data = data;
 236         wq->wq_end %= queuelen;
 237 
 238         PTH(pthread_cond_signal(&wq->wq_cv));
 239         PTH(pthread_mutex_unlock(&wq->wq_lock));
 240         PTH(pthread_rwlock_unlock(&worker_lock));
 241 
 242         (void) bunyan_debug(w->w_log, "Dispatching packet to worker",
 243             BUNYAN_T_UINT32, "worker", (uint32_t)n,
 244             BUNYAN_T_STRING, "event", worker_evt_str(event),
 245             BUNYAN_T_POINTER, "data", data,
 246             BUNYAN_T_END);
 247         return (B_TRUE);
 248 }
 249 
 250 static void *
 251 worker(void *arg)
 252 {
 253         worker_t *w = arg;
 254         worker_queue_t *wq = &w->w_queue;
 255         timespec_t ts = { 0 };
 256         boolean_t done = B_FALSE;
 257 
 258         ike_timer_thread_init();
 259 
 260         (void) bunyan_trace(w->w_log, "Worker starting", BUNYAN_T_END);
 261 
 262         PTH(pthread_mutex_lock(&wq->wq_lock));
 263 
 264         /*CONSTCOND*/
 265         while (1) {
 266                 process_timer(&ts, w->w_log);
 267 
 268                 if (ts.tv_sec == 0 && ts.tv_nsec == 0) {
 269                         PTH(pthread_cond_wait(&wq->wq_cv, &wq->wq_lock));
 270                 } else {
 271                         int rc;
 272                         rc = pthread_cond_timedwait(&wq->wq_cv, &wq->wq_lock,
 273                             &ts);
 274                         VERIFY(rc == 0 || rc == ETIMEDOUT);
 275                 }
 276 
 277                 if (wq->wq_cmd != WC_NONE)
 278                         (void) bunyan_info(w->w_log, "Received command",
 279                             BUNYAN_T_STRING, "cmd", worker_cmd_str(wq->wq_cmd),
 280                             BUNYAN_T_UINT32, "cmdval", (uint32_t)wq->wq_cmd,
 281                             BUNYAN_T_END);
 282 
 283                 switch (wq->wq_cmd) {
 284                 case WC_NONE:
 285                         break;
 286                 case WC_SUSPEND:
 287                         worker_do_suspend(wq);
 288                         ASSERT(MUTEX_HELD(&wq->wq_lock));
 289                         continue;                       
 290                 case WC_QUIT:
 291                         done = B_TRUE;
 292                         break;
 293                 default:
 294                         INVALID("wq->wq_cmd");
 295                 }
 296 
 297                 if (done)
 298                         break;
 299 
 300                 while (!WQ_EMPTY(wq)) {
 301                         worker_item_t wi = wq->wq_items[wq->wq_start];
 302 
 303                         wq->wq_items[wq->wq_start].wi_event = EVT_NONE;
 304                         wq->wq_items[wq->wq_start].wi_data = NULL;
 305 
 306                         wq->wq_start++;
 307                         wq->wq_start %= queuelen;
 308                         PTH(pthread_mutex_unlock(&wq->wq_lock));
 309 
 310                         switch (wi.wi_event) {
 311                         case EVT_NONE:
 312                                 INVALID("wi.wi_event");
 313                                 break;
 314                         case EVT_PACKET:
 315                                 worker_pkt_inbound(wi.wi_data);
 316                                 break;
 317                         case EVT_PFKEY:
 318                                 /* TODO */
 319                                 break;
 320                         }
 321 
 322                         PTH(pthread_mutex_lock(&wq->wq_lock));
 323                 }
 324         }
 325 
 326         w->w_done;
 327         PTH(pthread_cond_signal(&wq->wq_cv));
 328         PTH(pthread_mutex_unlock(&wq->wq_lock));
 329         return (w);
 330 }
 331 
 332 static void
 333 worker_pkt_inbound(pkt_t *pkt)
 334 {
 335         switch (IKE_GET_MAJORV(pkt->pkt_header.version)) {
 336         case 1:
 337                 /* XXX: ikev1_inbound(pkt); */
 338                 break;
 339         case 2:
 340                 ikev2_inbound(pkt);
 341                 break;
 342         default:
 343                 /* XXX: log? */
 344                 pkt_free(pkt);
 345         }
 346 }
 347 
 348 boolean_t
 349 worker_add(void)
 350 {
 351         worker_t **new_workers = NULL;
 352         worker_t *w = NULL;
 353         size_t new_workers_alloc = 0;
 354         size_t len = 0, qlen = 0;
 355 
 356         (void) bunyan_trace(log, "Creating new worker", BUNYAN_T_END);
 357 
 358         if (workers_alloc == nworkers) {
 359                 new_workers_alloc = workers_alloc + 1;
 360                 len = new_workers_alloc * sizeof (worker_t *);
 361                 VERIFY3U(len, >, new_workers_alloc);
 362                 VERIFY3U(len, >, sizeof (worker_t *));
 363 
 364                 if ((new_workers = umem_zalloc(len, UMEM_DEFAULT)) == NULL)
 365                         return (B_FALSE);
 366 
 367         } else {
 368                 new_workers = workers;
 369                 new_workers_alloc = workers_alloc;
 370         }
 371 
 372         qlen = queuelen * sizeof (pkt_t *);
 373         VERIFY3U(qlen, >, queuelen);
 374         VERIFY3U(qlen, >, sizeof (pkt_t *));
 375 
 376         if ((w = worker_init_one(qlen)) == NULL) {
 377                 umem_free(new_workers, len);
 378                 return (B_FALSE);
 379         }
 380 
 381         PTH(pthread_create(&w->w_tid, NULL, worker, w));
 382 
 383         PTH(pthread_rwlock_wrlock(&worker_lock));
 384         workers = new_workers;
 385         workers_alloc = new_workers_alloc;
 386 
 387         VERIFY3U(workers_alloc, >, nworkers);
 388         workers[nworkers++] = w;
 389 
 390         PTH(pthread_rwlock_unlock(&worker_lock));
 391 
 392         (void) bunyan_debug(w->w_log, "Worker created", BUNYAN_T_END);
 393 
 394         return (B_TRUE);
 395 }
 396 
 397 void
 398 worker_del(void)
 399 {
 400 }
 401 
 402 #define STR(x) case x: return (#x)
 403 static const char *
 404 worker_cmd_str(worker_cmd_t wc)
 405 {
 406         switch (wc) {
 407         STR(WC_NONE);
 408         STR(WC_SUSPEND);
 409         STR(WC_QUIT);
 410         }
 411         return ("UNKNOWN");
 412 }
 413 
 414 static const char *
 415 worker_evt_str(worker_evt_t evt)
 416 {
 417         switch (evt) {
 418         STR(EVT_NONE);
 419         STR(EVT_PACKET);
 420         STR(EVT_PFKEY);
 421         }
 422         return ("UNKNOWN");
 423 }