1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2018 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 /*
  17  * Stream-tasks module.
  18  * One stream-task is one send/recv operation.
  19  */
  20 
  21 #include <sys/types.h>
  22 #include <sys/kmem.h>
  23 #include <sys/ddi.h>
  24 #include <sys/sunddi.h>
  25 #include <sys/sunldi.h>
  26 #include <sys/time.h>
  27 #include <sys/strsubr.h>
  28 #include <sys/sysmacros.h>
  29 #include <sys/socketvar.h>
  30 #include <sys/ksocket.h>
  31 #include <sys/filio.h>
  32 #include <sys/modctl.h>
  33 #include <sys/class.h>
  34 #include <sys/cmn_err.h>
  35 #include <inet/ip.h>
  36 
  37 #include <sys/dmu_impl.h>
  38 
  39 #include <sys/kreplication_common.h>
  40 
  41 #include "krrp_stream_task.h"
  42 
  43 #define KRRP_FAKE_TXG 0x9BC1546914997F6C
  44 #define USEC2NSEC(m) ((hrtime_t)(m) * (NANOSEC / MICROSEC))
  45 
  46 /* #define KRRP_STREAM_TASK_DEBUG 1 */
  47 
  48 extern boolean_t krrp_stream_is_write_flag_set(krrp_stream_write_flag_t flags,
  49     krrp_stream_write_flag_t flag);
  50 extern boolean_t krrp_stream_is_read_flag_set(krrp_stream_read_flag_t flags,
  51     krrp_stream_read_flag_t flag);
  52 
  53 static int krrp_stream_te_common_create(krrp_stream_te_t **result_te,
  54     const char *dataset, boolean_t read_mode, krrp_error_t *error);
  55 
  56 static int krrp_stream_task_constructor(void *, void *, int);
  57 static void krrp_stream_task_destructor(void *, void *);
  58 
  59 static krrp_stream_task_shandler_t krrp_stream_task_read_start;
  60 static krrp_stream_task_shandler_t krrp_stream_task_common_stop;
  61 
  62 static krrp_stream_task_shandler_t krrp_stream_task_fake_common_action;
  63 
  64 static krrp_stream_task_handler_t krrp_stream_task_fake_write_handler;
  65 static krrp_stream_task_handler_t krrp_stream_task_write_handler;
  66 static krrp_stream_task_handler_t krrp_stream_task_fake_read_handler;
  67 static krrp_stream_task_handler_t krrp_stream_task_read_handler;
  68 
  69 static void krrp_stream_task_correct_total_size(krrp_dblk_t *, size_t *);
  70 
  71 static void krrp_stream_task_fake_rate_limit(krrp_stream_task_t *,
  72     hrtime_t, uint32_t);
  73 
  74 uint32_t krrp_stream_fake_rate_limit_mb = 0;
  75 
  76 int
  77 krrp_stream_te_read_create(krrp_stream_te_t **result_te,
  78     const char *dataset, krrp_stream_read_flag_t flags,
  79     krrp_check_enough_mem *mem_check_cb, void *mem_check_cb_arg,
  80     const char *skip_snaps_mask, krrp_error_t *error)
  81 {
  82         int rc;
  83         krrp_stream_te_t *task_engine;
  84 
  85         ASSERT(dataset != NULL);
  86 
  87         rc = krrp_stream_te_common_create(result_te,
  88             dataset, B_TRUE, error);
  89         if (rc != 0)
  90                 return (-1);
  91 
  92         task_engine = *result_te;
  93 
  94         if (skip_snaps_mask != NULL) {
  95                 char buf[ZAP_MAXNAMELEN + ZAP_MAXVALUELEN + 1];
  96                 char *eq_sym;
  97 
  98                 if (strlcpy(buf, skip_snaps_mask,
  99                     sizeof (buf)) >= sizeof (buf)) {
 100                         krrp_error_set(error, KRRP_ERRNO_SKIP_SNAPS_MASK, EMSGSIZE);
 101                         return (-1);
 102                 }
 103 
 104                 eq_sym = strchr(buf, '=');
 105                 if (eq_sym != NULL) {
 106                         *eq_sym = '\0';
 107                         eq_sym++;
 108                 }
 109 
 110                 if (eq_sym == NULL || *eq_sym == '\0') {
 111                         krrp_error_set(error, KRRP_ERRNO_SKIP_SNAPS_MASK, EINVAL);
 112                         return (-1);
 113                 }
 114 
 115                 if (strlcpy(task_engine->skip_snaps_prop_name, buf,
 116                     sizeof (task_engine->skip_snaps_prop_name)) >=
 117                     sizeof (task_engine->skip_snaps_prop_name)) {
 118                         krrp_error_set(error, KRRP_ERRNO_SKIP_SNAPS_MASK, ENAMETOOLONG);
 119                         return (-1);
 120                 }
 121 
 122                 if (strlcpy(task_engine->skip_snaps_prop_val, eq_sym,
 123                     sizeof (task_engine->skip_snaps_prop_val)) >=
 124                     sizeof (task_engine->skip_snaps_prop_val)) {
 125                         krrp_error_set(error, KRRP_ERRNO_SKIP_SNAPS_MASK, E2BIG);
 126                         return (-1);
 127                 }
 128         }
 129 
 130         task_engine->recursive =
 131             krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_RECURSIVE);
 132         task_engine->properties =
 133             krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_SEND_PROPS);
 134         task_engine->enable_cksum =
 135             krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_ENABLE_CHKSUM);
 136         task_engine->embedded =
 137             krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_EMBEDDED);
 138         task_engine->compressed =
 139             krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_COMPRESSED);
 140         task_engine->large_blocks =
 141             krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_LARGE_BLOCKS);
 142         task_engine->incremental_package =
 143             krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_SEND_ALL_SNAPS);
 144 
 145         task_engine->mem_check_cb = mem_check_cb;
 146         task_engine->mem_check_cb_arg = mem_check_cb_arg;
 147 
 148         return (0);
 149 }
 150 
 151 int
 152 krrp_stream_te_write_create(krrp_stream_te_t **result_te,
 153     const char *dataset, krrp_stream_write_flag_t flags,
 154     nvlist_t *ignore_props_list, nvlist_t *replace_props_list,
 155     krrp_error_t *error)
 156 {
 157         int rc;
 158         krrp_stream_te_t *task_engine;
 159 
 160         ASSERT(dataset != NULL);
 161 
 162         rc = krrp_stream_te_common_create(result_te,
 163             dataset, B_FALSE, error);
 164         if (rc != 0)
 165                 return (-1);
 166 
 167         task_engine = *result_te;
 168 
 169         task_engine->force_receive =
 170             krrp_stream_is_write_flag_set(flags, KRRP_STRMWF_FORCE_RECV);
 171         task_engine->enable_cksum =
 172             krrp_stream_is_write_flag_set(flags, KRRP_STRMWF_ENABLE_CHKSUM);
 173         task_engine->discard_head =
 174             krrp_stream_is_write_flag_set(flags, KRRP_STRMWF_DISCARD_HEAD);
 175         task_engine->leave_tail =
 176             krrp_stream_is_write_flag_set(flags, KRRP_STRMWF_LEAVE_TAIL);
 177 
 178         /*
 179          * Need to dup the nvls because they are part of another nvl,
 180          * that will be destroyed
 181          */
 182         if (ignore_props_list != NULL) {
 183                 task_engine->ignore_props_list =
 184                     fnvlist_dup(ignore_props_list);
 185         }
 186 
 187         if (replace_props_list != NULL) {
 188                 task_engine->replace_props_list =
 189                     fnvlist_dup(replace_props_list);
 190         }
 191 
 192         return (0);
 193 }
 194 
 195 int
 196 krrp_stream_te_fake_read_create(krrp_stream_te_t **result_te,
 197     krrp_error_t *error)
 198 {
 199         return (krrp_stream_te_common_create(result_te,
 200             NULL, B_TRUE, error));
 201 }
 202 
 203 int
 204 krrp_stream_te_fake_write_create(krrp_stream_te_t **result_te,
 205     krrp_error_t *error)
 206 {
 207         return (krrp_stream_te_common_create(result_te,
 208             NULL, B_FALSE, error));
 209 }
 210 
 211 void
 212 krrp_stream_te_destroy(krrp_stream_te_t *task_engine)
 213 {
 214         krrp_stream_task_t *task;
 215 
 216         while ((task = krrp_queue_get_no_wait(task_engine->tasks)) != NULL)
 217                 krrp_stream_task_fini(task);
 218 
 219         krrp_queue_fini(task_engine->tasks);
 220 
 221         if (task_engine->mode == KRRP_STEM_READ) {
 222                 while ((task = krrp_queue_get_no_wait(
 223                     task_engine->tasks_done)) != NULL)
 224                         krrp_stream_task_fini(task);
 225 
 226                 krrp_queue_fini(task_engine->tasks_done);
 227 
 228                 while ((task = krrp_queue_get_no_wait(
 229                     task_engine->tasks_done2)) != NULL)
 230                         krrp_stream_task_fini(task);
 231 
 232                 krrp_queue_fini(task_engine->tasks_done2);
 233         }
 234 
 235         kmem_cache_destroy(task_engine->tasks_cache);
 236 
 237         if (task_engine->global_zfs_ctx != NULL) {
 238                 dmu_krrp_stream_fini(task_engine->global_zfs_ctx);
 239                 task_engine->global_zfs_ctx = NULL;
 240         }
 241 
 242         if (task_engine->ignore_props_list != NULL)
 243                 fnvlist_free(task_engine->ignore_props_list);
 244 
 245         if (task_engine->replace_props_list != NULL)
 246                 fnvlist_free(task_engine->replace_props_list);
 247 
 248         kmem_free(task_engine, sizeof (krrp_stream_te_t));
 249 }
 250 
 251 static int
 252 krrp_stream_te_common_create(krrp_stream_te_t **result_te,
 253     const char *dataset, boolean_t read_mode, krrp_error_t *error)
 254 {
 255         krrp_stream_te_t *task_engine;
 256         char kmem_cache_name[KSTAT_STRLEN];
 257 
 258         ASSERT(result_te != NULL && *result_te == NULL);
 259 
 260         task_engine = kmem_zalloc(sizeof (krrp_stream_te_t), KM_SLEEP);
 261 
 262         if (dataset != NULL) {
 263                 task_engine->global_zfs_ctx = dmu_krrp_stream_init();
 264                 if (task_engine->global_zfs_ctx == NULL) {
 265                         kmem_free(task_engine, sizeof (krrp_stream_te_t));
 266                         krrp_error_set(error, KRRP_ERRNO_ZFSGCTXFAIL, 0);
 267                         return (-1);
 268                 }
 269 
 270                 task_engine->dataset = dataset;
 271         } else
 272                 task_engine->fake_mode = B_TRUE;
 273 
 274         krrp_queue_init(&task_engine->tasks, sizeof (krrp_stream_task_t),
 275             offsetof(krrp_stream_task_t, node));
 276 
 277         (void) snprintf(kmem_cache_name, KSTAT_STRLEN,
 278             "krrp_stc_%p", (void *)task_engine);
 279 
 280         task_engine->tasks_cache = kmem_cache_create(kmem_cache_name,
 281             sizeof (krrp_stream_task_t), 0, &krrp_stream_task_constructor,
 282             &krrp_stream_task_destructor, NULL, (void *)task_engine,
 283             NULL, KM_SLEEP);
 284 
 285         if (read_mode) {
 286                 task_engine->mode = KRRP_STEM_READ;
 287                 /*
 288                  * This queue contains tasks, that were completely read
 289                  * from ZFS and are being sent to the receiver
 290                  */
 291                 krrp_queue_init(&task_engine->tasks_done,
 292                     sizeof (krrp_stream_task_t),
 293                     offsetof(krrp_stream_task_t, node));
 294                 /*
 295                  * This queue contains tasks/threads waiting to receive
 296                  * acknowledgement from the receiver that the sent blocks
 297                  * have been written to the disk.
 298                  */
 299                 krrp_queue_init(&task_engine->tasks_done2,
 300                     sizeof (krrp_stream_task_t),
 301                     offsetof(krrp_stream_task_t, node));
 302         } else
 303                 task_engine->mode = KRRP_STEM_WRITE;
 304 
 305         *result_te = task_engine;
 306 
 307         return (0);
 308 }
 309 
 310 /*
 311  * Total number of task is the sum of tasks that are:
 312  *   waiting for write to ZFS on the receiver (task_engine->tasks_done2)
 313  *
 314  *   waiting for read from ZFS or in process of being sent to the receiver
 315  *     task_engine->tasks + task_engine->tasks_done
 316  */
 317 size_t
 318 krrp_stream_te_total_num_tasks(krrp_stream_te_t *task_engine)
 319 {
 320         return (krrp_queue_length(task_engine->tasks) +
 321             krrp_queue_length(task_engine->tasks_done) +
 322             krrp_queue_length(task_engine->tasks_done2));
 323 }
 324 
 325 /*
 326  * Pending tasks are the tasks, that are waiting for read from ZFS
 327  */
 328 size_t
 329 krrp_stream_te_num_pending_tasks(krrp_stream_te_t *task_engine)
 330 {
 331         return (krrp_queue_length(task_engine->tasks));
 332 }
 333 
 334 void
 335 krrp_stream_read_task_init(krrp_stream_te_t *task_engine, uint64_t txg,
 336     const char *src_snap, const char *src_inc_snap, nvlist_t *resume_info)
 337 {
 338         ASSERT(task_engine->mode == KRRP_STEM_READ);
 339 
 340         ASSERT(src_snap != NULL && strlen(src_snap) != 0);
 341 
 342         krrp_stream_task_t *task;
 343 
 344         task = kmem_cache_alloc(task_engine->tasks_cache, KM_SLEEP);
 345 
 346         task->txg = txg;
 347 
 348         (void) strlcpy(task->zargs.from_snap, src_snap,
 349             sizeof (task->zargs.from_snap));
 350         if (src_inc_snap != NULL)
 351                 (void) strlcpy(task->zargs.from_incr_base, src_inc_snap,
 352                     sizeof (task->zargs.from_incr_base));
 353         else
 354                 task->zargs.from_incr_base[0] = '\0';
 355 
 356         task->zargs.resume_info =
 357             resume_info != NULL ? fnvlist_dup(resume_info) : NULL;
 358 
 359         task->init_hrtime = gethrtime();
 360 
 361         krrp_queue_put(task_engine->tasks, task);
 362 }
 363 
 364 void
 365 krrp_stream_fake_read_task_init(krrp_stream_te_t *task_engine,
 366     uint64_t fake_data_sz)
 367 {
 368         ASSERT(task_engine->mode == KRRP_STEM_READ);
 369         ASSERT(fake_data_sz != 0);
 370 
 371         krrp_stream_task_t *task;
 372 
 373         task = kmem_cache_alloc(task_engine->tasks_cache, KM_SLEEP);
 374 
 375         task->txg = KRRP_FAKE_TXG;
 376         task->fake_data_sz = fake_data_sz;
 377         task->init_hrtime = gethrtime();
 378 
 379         krrp_queue_put(task_engine->tasks, task);
 380 }
 381 
 382 void
 383 krrp_stream_write_task_init(krrp_stream_te_t *task_engine, uint64_t txg,
 384     krrp_stream_task_t **result_task, nvlist_t *resume_info)
 385 {
 386         ASSERT(task_engine->mode == KRRP_STEM_WRITE);
 387 
 388         krrp_stream_task_t *task;
 389 
 390         task = kmem_cache_alloc(task_engine->tasks_cache, KM_SLEEP);
 391 
 392         task->txg = txg;
 393 
 394         task->txg_start = UINT64_MAX;
 395         task->txg_end = UINT64_MAX;
 396 
 397         task->zargs.resume_info =
 398             resume_info != NULL ? fnvlist_dup(resume_info) : NULL;
 399 
 400         if (!task_engine->fake_mode)
 401                 task->zfs_ctx = dmu_krrp_init_recv_task(&task->zargs);
 402 
 403         *result_task = task;
 404 }
 405 
 406 hrtime_t
 407 krrp_stream_task_calc_rpo(krrp_stream_task_t *task)
 408 {
 409         ASSERT(task->engine->mode == KRRP_STEM_READ);
 410 
 411         return (gethrtime() - task->init_hrtime);
 412 }
 413 
 414 void
 415 krrp_stream_task_fini(krrp_stream_task_t *task)
 416 {
 417         krrp_stream_te_t *task_engine = task->engine;
 418 
 419         task->txg = 0;
 420         task->zfs_ctx = NULL;
 421         task->done = B_FALSE;
 422 
 423         kmem_cache_free(task_engine->tasks_cache, task);
 424 }
 425 
 426 void
 427 krrp_stream_task_engine_get_task(krrp_stream_te_t *task_engine,
 428     krrp_stream_task_t **result_stream_task)
 429 {
 430         *result_stream_task = krrp_queue_get(task_engine->tasks);
 431 }
 432 
 433 /* ARGSUSED */
 434 static int
 435 krrp_stream_task_constructor(void *opaque_task,
 436     void *opaque_task_engine, int km_flags)
 437 {
 438         krrp_stream_task_t *task = opaque_task;
 439         krrp_stream_te_t *task_engine = opaque_task_engine;
 440 
 441         bzero(&task->zargs, sizeof (kreplication_zfs_args_t));
 442 
 443         task->zargs.stream_handler = task_engine->global_zfs_ctx;
 444         task->zargs.force_cksum = task_engine->enable_cksum;
 445 
 446         switch (task_engine->mode) {
 447         case KRRP_STEM_READ:
 448                 if (task_engine->fake_mode) {
 449                         task->process = &krrp_stream_task_fake_read_handler;
 450                         task->start = &krrp_stream_task_fake_common_action;
 451                         task->shutdown = &krrp_stream_task_fake_common_action;
 452                 } else {
 453                         (void) strlcpy(task->zargs.from_ds,
 454                             task_engine->dataset,
 455                             sizeof (task->zargs.from_ds));
 456 
 457                         task->process = &krrp_stream_task_read_handler;
 458                         task->start = &krrp_stream_task_read_start;
 459                         task->shutdown = &krrp_stream_task_common_stop;
 460 
 461                         task->zargs.skip_snaps_prop_name =
 462                             task_engine->skip_snaps_prop_name;
 463                         task->zargs.skip_snaps_prop_val =
 464                             task_engine->skip_snaps_prop_val;
 465 
 466                         task->zargs.mem_check_cb =
 467                             task_engine->mem_check_cb;
 468                         task->zargs.mem_check_cb_arg =
 469                             task_engine->mem_check_cb_arg;
 470                 }
 471 
 472                 task->zargs.do_all = task_engine->incremental_package;
 473                 task->zargs.properties = task_engine->properties;
 474                 task->zargs.recursive = task_engine->recursive;
 475                 task->zargs.embedok = task_engine->embedded;
 476                 task->zargs.compressok = task_engine->compressed;
 477                 task->zargs.large_block_ok = task_engine->large_blocks;
 478 
 479                 break;
 480         case KRRP_STEM_WRITE:
 481                 if (task_engine->fake_mode) {
 482                         task->process = &krrp_stream_task_fake_write_handler;
 483                         task->shutdown = &krrp_stream_task_fake_common_action;
 484                 } else {
 485                         (void) strlcpy(task->zargs.to_ds, task_engine->dataset,
 486                             sizeof (task->zargs.to_ds));
 487 
 488                         task->process = &krrp_stream_task_write_handler;
 489                         task->shutdown = &krrp_stream_task_common_stop;
 490                 }
 491 
 492                 task->zargs.force = task_engine->force_receive;
 493                 task->zargs.ignore_list = task_engine->ignore_props_list;
 494                 task->zargs.replace_list = task_engine->replace_props_list;
 495                 task->zargs.strip_head = task_engine->discard_head;
 496                 task->zargs.leave_tail = task_engine->leave_tail;
 497 
 498                 break;
 499         default:
 500                 VERIFY(0);
 501                 break;
 502         }
 503 
 504         if (task_engine->fake_mode) {
 505                 mutex_init(&task->mtx, NULL, MUTEX_DEFAULT, NULL);
 506                 cv_init(&task->cv, NULL, CV_DEFAULT, NULL);
 507         }
 508 
 509         task->engine = task_engine;
 510 
 511         task->txg = 0;
 512         task->zfs_ctx = NULL;
 513         task->done = B_FALSE;
 514 
 515         return (0);
 516 }
 517 
 518 static void
 519 krrp_stream_task_destructor(void *opaque_task,
 520     void *opaque_task_engine)
 521 {
 522         krrp_stream_task_t *task = opaque_task;
 523         krrp_stream_te_t *task_engine = opaque_task_engine;
 524 
 525         if (task_engine->fake_mode) {
 526                 cv_destroy(&task->cv);
 527                 mutex_destroy(&task->mtx);
 528         }
 529 }
 530 
 531 static int
 532 krrp_stream_task_fake_write_handler(krrp_stream_task_t *task,
 533     krrp_pdu_data_t *pdu)
 534 {
 535         krrp_stream_task_fake_rate_limit(task,
 536             gethrtime(), pdu->cur_data_sz);
 537 
 538         if (pdu->final)
 539                 task->done = B_TRUE;
 540 
 541         return (0);
 542 }
 543 
 544 static int
 545 krrp_stream_task_write_handler(krrp_stream_task_t *task, krrp_pdu_data_t *pdu)
 546 {
 547         int rc = 0;
 548         kreplication_buffer_t *kr_buf = NULL;
 549 
 550         kr_buf = (kreplication_buffer_t *)pdu->dblk;
 551 
 552         ASSERT(task->zfs_ctx != NULL);
 553 
 554         if (pdu->final)
 555                 task->done = B_TRUE;
 556 
 557         if (kr_buf->data_size != 0)
 558                 rc = dmu_krrp_lend_recv_buffer(task->zfs_ctx, kr_buf);
 559 
 560 #ifdef KRRP_STREAM_TASK_DEBUG
 561         VERIFY3U(rc, ==, 0);
 562 #endif
 563 
 564         return (rc);
 565 }
 566 
 567 static int
 568 krrp_stream_task_fake_read_handler(krrp_stream_task_t *task,
 569     krrp_pdu_data_t *pdu)
 570 {
 571         krrp_dblk_t *dblk;
 572         hrtime_t start = gethrtime();
 573 
 574         dblk = pdu->dblk;
 575 
 576         while (dblk != NULL) {
 577                 if (task->fake_data_sz > dblk->max_data_sz)
 578                         dblk->cur_data_sz = dblk->max_data_sz;
 579                 else
 580                         dblk->cur_data_sz = task->fake_data_sz;
 581 
 582                 task->fake_data_sz -= dblk->cur_data_sz;
 583                 pdu->cur_data_sz += dblk->cur_data_sz;
 584 
 585                 if (task->fake_data_sz == 0) {
 586                         if (dblk->next == NULL) {
 587                                 pdu->cur_data_sz -= dblk->cur_data_sz;
 588                                 dblk->cur_data_sz = 0;
 589                         }
 590 
 591                         pdu->final = B_TRUE;
 592                         task->done = B_TRUE;
 593                         break;
 594                 }
 595 
 596                 dblk = dblk->next;
 597         }
 598 
 599         pdu->txg = task->txg;
 600 
 601         krrp_stream_task_fake_rate_limit(task, start, pdu->cur_data_sz);
 602 
 603         return (0);
 604 }
 605 
 606 static void
 607 krrp_stream_task_fake_rate_limit(krrp_stream_task_t *task,
 608     hrtime_t start_time, uint32_t data_sz)
 609 {
 610         hrtime_t diff, delay;
 611 
 612         if (krrp_stream_fake_rate_limit_mb == 0 || data_sz == 0) {
 613                 DTRACE_PROBE1(rate_limit_delay1, uint64_t, 0);
 614                 return;
 615         }
 616 
 617         diff = gethrtime() - start_time;
 618         delay = ((hrtime_t)data_sz * NANOSEC) /
 619             ((hrtime_t)krrp_stream_fake_rate_limit_mb * 1024 * 1024);
 620         if (diff > delay) {
 621                 DTRACE_PROBE2(rate_limit_delay2, uint64_t, diff,
 622                     uint64_t, delay);
 623                 return;
 624         }
 625 
 626         delay = delay - diff;
 627 
 628         DTRACE_PROBE1(rate_limit_delay3, uint64_t, delay);
 629 
 630         mutex_enter(&task->mtx);
 631         (void) cv_timedwait_hires(&task->cv, &task->mtx,
 632             delay, USEC2NSEC(100), CALLOUT_FLAG_ROUNDUP);
 633         mutex_exit(&task->mtx);
 634 
 635         diff = gethrtime() - start_time;
 636         DTRACE_PROBE2(rate_limit_delay4, uint64_t, delay, uint64_t, diff);
 637 }
 638 
 639 static int
 640 krrp_stream_task_read_handler(krrp_stream_task_t *task, krrp_pdu_data_t *pdu)
 641 {
 642         int rc = 0;
 643         kreplication_buffer_t *kr_buf = NULL;
 644 
 645         kr_buf = (kreplication_buffer_t *)pdu->dblk;
 646 
 647         ASSERT(task->zfs_ctx != NULL);
 648 
 649         pdu->txg = task->txg;
 650 
 651         /*
 652          * dmu_krrp_lend_send_buffer always fill all available space.
 653          * only in case of ENODATA it may not fill all available space.
 654          */
 655         pdu->cur_data_sz = pdu->max_data_sz;
 656         rc = dmu_krrp_lend_send_buffer(task->zfs_ctx, kr_buf);
 657 
 658         /* so in this case need to recalculate total size */
 659         if (rc == ENODATA) {
 660                 pdu->final = B_TRUE;
 661                 task->done = B_TRUE;
 662                 rc = 0;
 663                 pdu->cur_data_sz = 0;
 664                 krrp_stream_task_correct_total_size(pdu->dblk,
 665                     &pdu->cur_data_sz);
 666                 VERIFY3U(pdu->cur_data_sz, <=, pdu->max_data_sz);
 667         }
 668 
 669 #ifdef KRRP_STREAM_TASK_DEBUG
 670         VERIFY3U(rc, ==, 0);
 671 #endif
 672 
 673         return (rc);
 674 }
 675 
 676 static void
 677 krrp_stream_task_read_start(krrp_stream_task_t *task)
 678 {
 679         task->zfs_ctx = dmu_krrp_init_send_task(&task->zargs);
 680 }
 681 
 682 /* ARGSUSED */
 683 static void
 684 krrp_stream_task_fake_common_action(krrp_stream_task_t *task)
 685 {
 686         /* nothing to do */
 687 }
 688 
 689 static void
 690 krrp_stream_task_common_stop(krrp_stream_task_t *task)
 691 {
 692         ASSERT(task->zfs_ctx != NULL);
 693 
 694 #ifdef KRRP_STREAM_TASK_DEBUG
 695         VERIFY3U(dmu_krrp_fini_task(task->zfs_ctx), ==, 0);
 696 #else
 697         (void) dmu_krrp_fini_task(task->zfs_ctx);
 698 #endif
 699 
 700         task->zfs_ctx = NULL;
 701 }
 702 
 703 static void
 704 krrp_stream_task_correct_total_size(krrp_dblk_t *dblk_head,
 705     size_t *total_data_sz)
 706 {
 707         krrp_dblk_t *dblk;
 708 
 709         dblk = dblk_head;
 710         while (dblk != NULL) {
 711                 *total_data_sz += dblk->cur_data_sz;
 712                 if (dblk->cur_data_sz == 0)
 713                         break;
 714 
 715                 dblk = dblk->next;
 716         }
 717 }