1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2016 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 /*
  17  * PDU is high level structure (a container) that contains header, some ctrl
  18  * information and list of dblks that contain payload.
  19  * Over network are transfered header and dblks.
  20  */
  21 
  22 #include "krrp_svc.h"
  23 #include "krrp_pdu.h"
  24 
  25 /* #define KRRP_PDU_DEBUG 1 */
  26 
  27 static struct {
  28         uint64_t                        ref_cnt;
  29         krrp_pdu_engine_t       *ctrl_pdu_engine;
  30 } krrp_global_pdu_engine_env = {0, NULL};
  31 
  32 static void krrp_pdu_rele_task(void *void_pdu);
  33 static void krrp_pdu_engine_notify_cb(void *void_pdu_engine);
  34 
  35 int
  36 krrp_pdu_engine_global_init()
  37 {
  38         int rc;
  39         krrp_error_t error;
  40 
  41         VERIFY(krrp_global_pdu_engine_env.ref_cnt == 0);
  42         VERIFY(krrp_global_pdu_engine_env.ctrl_pdu_engine == NULL);
  43 
  44         /*
  45          * CTRL PDU Engine that will be used by all sessions:
  46          * preallocation: B_FALSE
  47          * max_memory: 100 MB (should be enough)
  48          * dblk_per_pdu: 1
  49          * dblk_head_sz: 0
  50          * dblk_data_sz: 2 KB
  51          * notify_free_cb: NULL
  52          * notify_free_cb_arg: NULL
  53          */
  54         rc = krrp_pdu_engine_create(&krrp_global_pdu_engine_env.ctrl_pdu_engine,
  55             B_TRUE, B_FALSE, 100, 1, 0, 2 * 1024, &error);
  56 
  57         return (rc);
  58 }
  59 
  60 void
  61 krrp_pdu_engine_global_fini()
  62 {
  63         VERIFY(krrp_global_pdu_engine_env.ctrl_pdu_engine != NULL);
  64         VERIFY(krrp_global_pdu_engine_env.ref_cnt == 1);
  65 
  66         krrp_pdu_engine_destroy(krrp_global_pdu_engine_env.ctrl_pdu_engine);
  67         krrp_global_pdu_engine_env.ctrl_pdu_engine = NULL;
  68 }
  69 
  70 void
  71 krrp_pdu_ctrl_alloc(krrp_pdu_ctrl_t **result_pdu, boolean_t with_header)
  72 {
  73         krrp_pdu_alloc(krrp_global_pdu_engine_env.ctrl_pdu_engine,
  74             (krrp_pdu_t **)result_pdu, with_header);
  75 }
  76 
  77 int
  78 krrp_pdu_engine_create(krrp_pdu_engine_t **result_engine, boolean_t ctrl,
  79     boolean_t prealloc, size_t max_memory, size_t dblks_per_pdu,
  80     size_t dblk_head_sz, size_t dblk_data_sz, krrp_error_t *error)
  81 {
  82         int rc;
  83         krrp_pdu_engine_t *engine;
  84         size_t total_dblk_sz, max_dblk_cnt;
  85 
  86         VERIFY(result_engine != NULL && *result_engine == NULL);
  87         VERIFY(max_memory != 0);
  88         VERIFY(dblk_data_sz != 0);
  89 
  90         engine = kmem_zalloc(sizeof (krrp_pdu_engine_t), KM_SLEEP);
  91 
  92         engine->type = ctrl ? KRRP_PET_CTRL : KRRP_PET_DATA;
  93 
  94         total_dblk_sz = dblk_head_sz + dblk_data_sz;
  95         if (dblks_per_pdu == 0)
  96                 engine->dblks_per_pdu =
  97                     (KRRP_PDU_DEFAULT_SIZE / total_dblk_sz) + 1;
  98         else
  99                 engine->dblks_per_pdu = dblks_per_pdu;
 100 
 101         engine->max_pdu_cnt = (max_memory * 1024 * 1024) /
 102             total_dblk_sz / engine->dblks_per_pdu;
 103 
 104         engine->pdu_data_sz = dblk_data_sz * engine->dblks_per_pdu;
 105 
 106         max_dblk_cnt = engine->max_pdu_cnt * engine->dblks_per_pdu;
 107         rc = krrp_dblk_engine_create(&engine->dblk_engine, prealloc,
 108             max_dblk_cnt, dblk_head_sz, dblk_data_sz,
 109             engine->dblks_per_pdu, &krrp_pdu_engine_notify_cb, engine,
 110             error);
 111         if (rc != 0) {
 112                 kmem_free(engine, sizeof (krrp_pdu_engine_t));
 113                 return (-1);
 114         }
 115 
 116         atomic_inc_64(&krrp_global_pdu_engine_env.ref_cnt);
 117 
 118         mutex_init(&engine->mtx, NULL, MUTEX_DEFAULT, NULL);
 119         cv_init(&engine->cv, NULL, CV_DEFAULT, NULL);
 120 
 121         cmn_err(CE_NOTE, "PDU Engine config: dblk_head_sz:[%lu], "
 122             "dblk_data_sz:[%lu], max_mem:[%lu MB], "
 123             "dblks_per_pdu:[%lu], prealloc:[%s]",
 124             dblk_head_sz, dblk_data_sz, max_memory,
 125             engine->dblks_per_pdu, (prealloc ? "YES" : "NO"));
 126 
 127         *result_engine = engine;
 128         return (0);
 129 }
 130 
 131 void
 132 krrp_pdu_engine_destroy(krrp_pdu_engine_t *engine)
 133 {
 134         krrp_dblk_engine_destroy(engine->dblk_engine);
 135 
 136         mutex_enter(&engine->mtx);
 137 
 138         cv_destroy(&engine->cv);
 139 
 140         mutex_exit(&engine->mtx);
 141         mutex_destroy(&engine->mtx);
 142 
 143         atomic_dec_64(&krrp_global_pdu_engine_env.ref_cnt);
 144 
 145         kmem_free(engine, sizeof (krrp_pdu_engine_t));
 146 }
 147 
 148 void
 149 krrp_pdu_engine_register_callback(krrp_pdu_engine_t *engine,
 150     krrp_pdu_free_notify_cb_t *notify_cb, void *notify_cb_arg)
 151 {
 152         VERIFY(notify_cb != NULL);
 153         VERIFY(engine->notify_free.cb == NULL);
 154 
 155         mutex_enter(&engine->mtx);
 156 
 157         engine->notify_free.cb = notify_cb;
 158         engine->notify_free.cb_arg = notify_cb_arg;
 159         engine->notify_free.init_value = engine->max_pdu_cnt >> 3;
 160         if (engine->notify_free.init_value == 0)
 161                 engine->notify_free.init_value = 1;
 162 
 163         engine->notify_free.cnt = engine->notify_free.init_value;
 164 
 165         mutex_exit(&engine->mtx);
 166 }
 167 
 168 void
 169 krrp_pdu_engine_force_notify(krrp_pdu_engine_t *engine, boolean_t initial)
 170 {
 171         size_t value;
 172 
 173         mutex_enter(&engine->mtx);
 174 
 175         VERIFY(engine->notify_free.cb != NULL);
 176 
 177         if (initial) {
 178                 value = engine->max_pdu_cnt;
 179                 goto out;
 180         }
 181 
 182         value = engine->notify_free.init_value - engine->notify_free.cnt;
 183         if (value == 0 && engine->cur_pdu_cnt == 0)
 184                 value = engine->notify_free.init_value;
 185 
 186         engine->notify_free.cnt = engine->notify_free.init_value;
 187 
 188 out:
 189         if (value != 0)
 190                 engine->notify_free.cb(engine->notify_free.cb_arg, value);
 191 
 192         mutex_exit(&engine->mtx);
 193 }
 194 
 195 size_t
 196 krrp_pdu_engine_get_used_mem(krrp_pdu_engine_t *pdu_engine)
 197 {
 198         return (pdu_engine->pdu_data_sz * pdu_engine->cur_pdu_cnt);
 199 }
 200 
 201 size_t
 202 krrp_pdu_engine_get_free_mem(krrp_pdu_engine_t *pdu_engine)
 203 {
 204         size_t free_pdus;
 205 
 206         /*
 207          * Mutex is not required here, because this
 208          * function is called from zfs-send context,
 209          * that does allocation of PDUs.
 210          *
 211          * Without mutex sometimes free mem may be less
 212          * than it actually is, but this is noncritical.
 213          */
 214         free_pdus = pdu_engine->max_pdu_cnt - pdu_engine->cur_pdu_cnt;
 215 
 216         return (pdu_engine->pdu_data_sz * free_pdus);
 217 }
 218 
 219 void
 220 krrp_pdu_alloc(krrp_pdu_engine_t *pdu_engine, krrp_pdu_t **result_pdu,
 221     boolean_t alloc_header)
 222 {
 223         clock_t time_left = 0;
 224         krrp_dblk_engine_t *dblk_engine;
 225         krrp_pdu_t *pdu = NULL;
 226 
 227         VERIFY(pdu_engine != NULL);
 228         VERIFY(result_pdu != NULL && *result_pdu == NULL);
 229 
 230 #ifdef KRRP_PDU_DEBUG
 231         cmn_err(CE_NOTE, "Init new PDU-[%s]",
 232             (pdu_engine->type == KRRP_PET_DATA ? "DATA" : "CTRL"));
 233 #endif
 234 
 235         dblk_engine = pdu_engine->dblk_engine;
 236 
 237         mutex_enter(&pdu_engine->mtx);
 238         while ((pdu_engine->max_pdu_cnt - pdu_engine->cur_pdu_cnt) == 0) {
 239                 if (time_left == -1) {
 240                         mutex_exit(&pdu_engine->mtx);
 241                         return;
 242                 }
 243 
 244                 time_left = cv_reltimedwait(&pdu_engine->cv,
 245                     &pdu_engine->mtx, MSEC_TO_TICK(500), TR_CLOCK_TICK);
 246         }
 247 
 248         pdu_engine->cur_pdu_cnt++;
 249         mutex_exit(&pdu_engine->mtx);
 250 
 251         switch (pdu_engine->type) {
 252         case KRRP_PET_DATA:
 253                 pdu = kmem_zalloc(sizeof (krrp_pdu_data_t), KM_SLEEP);
 254                 pdu->type = KRRP_PT_DATA;
 255                 break;
 256         case KRRP_PET_CTRL:
 257                 pdu = kmem_zalloc(sizeof (krrp_pdu_ctrl_t), KM_SLEEP);
 258                 pdu->type = KRRP_PT_CTRL;
 259                 break;
 260         }
 261 
 262         pdu->max_data_sz = pdu_engine->dblks_per_pdu *
 263             dblk_engine->dblk_data_sz;
 264 
 265         if (alloc_header)
 266                 pdu->hdr = kmem_zalloc(sizeof (krrp_hdr_t), KM_SLEEP);
 267 
 268         krrp_dblk_alloc(dblk_engine, &pdu->dblk,
 269             pdu_engine->dblks_per_pdu);
 270 
 271         /*
 272          * Counter of free PDU is updated by events from dblk-engine,
 273          * so if we here, then dblk-engine must not return NULL
 274          */
 275         VERIFY(pdu->dblk != NULL);
 276 
 277         *result_pdu = pdu;
 278 }
 279 
 280 void
 281 krrp_pdu_rele(krrp_pdu_t *pdu)
 282 {
 283         /*
 284          * Async rele make sense only if PDU has dblks,
 285          * because dblk_rele logic uses locks
 286          *
 287          * PDU has dblks at receiver side, because PDU initialized
 288          * by connection and released by stream
 289          *
 290          * At sender side PDU does not contain dblk, because PDU intialized
 291          * by stream and released by connection, but connection passes dblks
 292          * to TCP/IP stack, where dblks are processed and released.
 293          */
 294         if (pdu->dblk == NULL) {
 295                 krrp_pdu_rele_task(pdu);
 296                 return;
 297         }
 298 
 299         krrp_svc_dispatch_task(krrp_pdu_rele_task, pdu);
 300 }
 301 
 302 static void
 303 krrp_pdu_rele_task(void *void_pdu)
 304 {
 305         krrp_pdu_t *pdu = void_pdu;
 306 
 307 #ifdef KRRP_PDU_DEBUG
 308         cmn_err(CE_NOTE, "RELE PDU: [%d]", pdu->type);
 309 #endif
 310 
 311         if (pdu->dblk != NULL) {
 312                 krrp_dblk_rele(pdu->dblk);
 313                 pdu->dblk = NULL;
 314         }
 315 
 316         if (pdu->hdr != NULL) {
 317                 kmem_free(pdu->hdr, sizeof (krrp_hdr_t));
 318                 pdu->hdr = NULL;
 319         }
 320 
 321         switch (pdu->type) {
 322         case KRRP_PT_DATA:
 323                 kmem_free(pdu, sizeof (krrp_pdu_data_t));
 324                 break;
 325         case KRRP_PT_CTRL:
 326                 kmem_free(pdu, sizeof (krrp_pdu_ctrl_t));
 327                 break;
 328         }
 329 }
 330 
 331 static void
 332 krrp_pdu_engine_notify_cb(void *void_pdu_engine)
 333 {
 334         krrp_pdu_engine_t *engine = void_pdu_engine;
 335 
 336         mutex_enter(&engine->mtx);
 337 
 338 #ifdef KRRP_PDU_DEBUG
 339         cmn_err(CE_NOTE, "Dblk rele notification [%d]", engine->type);
 340 #endif
 341 
 342         ASSERT(engine->cur_pdu_cnt > 0);
 343 
 344         engine->cur_pdu_cnt--;
 345         if (engine->notify_free.cb != NULL) {
 346                 engine->notify_free.cnt--;
 347                 if (engine->notify_free.cnt == 0) {
 348                         engine->notify_free.cnt =
 349                             engine->notify_free.init_value;
 350                         engine->notify_free.cb(engine->notify_free.cb_arg,
 351                             engine->notify_free.cnt);
 352                 }
 353         }
 354 
 355         cv_broadcast(&engine->cv);
 356         mutex_exit(&engine->mtx);
 357 }
 358 
 359 int
 360 krrp_pdu_get_payload(krrp_pdu_ctrl_t *pdu, void **res_data, size_t *res_data_sz)
 361 {
 362         size_t payload_sz;
 363         void *data;
 364         int rc;
 365 
 366         ASSERT(pdu != NULL && pdu->hdr != NULL);
 367         ASSERT(res_data != NULL && res_data_sz != NULL);
 368 
 369         payload_sz = pdu->hdr->payload_sz;
 370         if (payload_sz == 0) {
 371                 *res_data = NULL;
 372                 *res_data_sz = 0;
 373                 return (0);
 374         }
 375 
 376         data = kmem_alloc(payload_sz, KM_SLEEP);
 377         rc = krrp_dblk_get_data(pdu->dblk, data, payload_sz);
 378         if (rc != 0) {
 379                 kmem_free(data, payload_sz);
 380                 *res_data = NULL;
 381                 *res_data_sz = 0;
 382                 return (rc);
 383         }
 384 
 385         *res_data = data;
 386         *res_data_sz = payload_sz;
 387         return (0);
 388 }
 389 
 390 int
 391 krrp_pdu_get_nvl_from_payload(krrp_pdu_ctrl_t *pdu, nvlist_t **res_nvl)
 392 {
 393         void *data = NULL;
 394         size_t data_sz = 0;
 395         int rc;
 396 
 397         rc = krrp_pdu_get_payload(pdu, &data, &data_sz);
 398         if (rc != 0)
 399                 return (rc);
 400 
 401         rc = nvlist_unpack(data, data_sz, res_nvl, KM_SLEEP);
 402 
 403         kmem_free(data, data_sz);
 404 
 405         return (rc);
 406 }