1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2018 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 /*
  17  * Stream module
  18  */
  19 
  20 #include <sys/types.h>
  21 #include <sys/kmem.h>
  22 #include <sys/ddi.h>
  23 #include <sys/sunddi.h>
  24 #include <sys/sunldi.h>
  25 #include <sys/time.h>
  26 #include <sys/sdt.h>
  27 #include <sys/sysmacros.h>
  28 #include <sys/modctl.h>
  29 #include <sys/class.h>
  30 #include <sys/cmn_err.h>
  31 
  32 #include "krrp_stream.h"
  33 
  34 #define is_str_empty(str) (str[0] == '\0')
  35 
  36 #define copy_str(dst_str, src_str, dst_str_max_sz) \
  37         ((strlcpy(dst_str, src_str, dst_str_max_sz) < dst_str_max_sz) ? 0 : -1)
  38 
  39 /* #define      KRRP_STREAM_DEBUG 1 */
  40 
  41 /*
  42  * This variable is a safeguard for the continuous replication sessions.
  43  * It defines additional number of read-tasks per one session.
  44  * Each read-task is an autosnapshot for such sessions.
  45  * When the total number of read-tasks reach value that equal
  46  * keep_snaps + krrp_add_num_read_tasks then the corresponding
  47  * session will stop confirmation of create-requests from Autosnap.
  48  */
  49 size_t krrp_add_num_read_tasks = 5;
  50 
  51 
  52 /* These extern functions are part of ZFS sources */
  53 extern int wbc_check_dataset(const char *name);
  54 extern uint64_t dsl_dataset_creation_txg(const char *name);
  55 extern int dmu_krrp_decode_resume_token(const char *resume_token,
  56     nvlist_t **resume_info);
  57 
  58 
  59 typedef void (krrp_stream_handler_t)(void *);
  60 
  61 static krrp_stream_t *krrp_stream_common_create(void);
  62 static void krrp_stream_task_done(krrp_stream_t *, krrp_stream_task_t *,
  63     boolean_t);
  64 static void krrp_stream_callback(krrp_stream_t *, krrp_stream_cb_ev_t,
  65     uintptr_t);
  66 
  67 static void krrp_stream_calc_avg_rpo(krrp_stream_t *, krrp_stream_task_t *,
  68     boolean_t);
  69 static void krrp_stream_read(void *);
  70 static void krrp_stream_write(void *);
  71 
  72 static int krrp_stream_activate_autosnap(krrp_stream_t *stream,
  73     krrp_error_t *error);
  74 static void krrp_stream_autosnap_restore_cb(void *void_stream,
  75     const char *snap_name, uint64_t txg);
  76 
  77 static int krrp_stream_validate_run(krrp_stream_t *stream,
  78     krrp_error_t *error);
  79 
  80 static uint64_t krrp_stream_get_snap_txg(krrp_stream_t *stream,
  81     const char *short_snap_name);
  82 
  83 static boolean_t
  84 krrp_stream_check_mem(size_t required_mem, void *void_stream);
  85 
  86 #if 0
  87 static void krrp_stream_debug(const char *, void *, void *, void *, void *);
  88 #endif
  89 
  90 static void krrp_stream_snap_create_error_cb(const char *, int,
  91     uint64_t, void *);
  92 static boolean_t krrp_stream_read_snap_confirm_cb(const char *, boolean_t,
  93     uint64_t, void *);
  94 static boolean_t krrp_stream_write_snap_notify_cb(const char *, boolean_t,
  95     boolean_t, uint64_t, uint64_t, void *);
  96 static boolean_t krrp_stream_read_snap_notify_cb(const char *, boolean_t,
  97     boolean_t, uint64_t, uint64_t, void *);
  98 
  99 int
 100 krrp_stream_read_create(krrp_stream_t **result_stream,
 101     size_t keep_snaps, const char *dataset, const char *base_snap_name,
 102     const char *incr_snap_name, const char *resume_token,
 103     krrp_stream_read_flag_t flags, const char *skip_snaps_mask,
 104     krrp_error_t *error)
 105 {
 106         krrp_stream_t *stream;
 107         int rc;
 108 
 109         VERIFY(result_stream != NULL && *result_stream == NULL);
 110         VERIFY(dataset != NULL);
 111 
 112         stream = krrp_stream_common_create();
 113 
 114         stream->notify_txg = UINT64_MAX;
 115         stream->mode = KRRP_STRMM_READ;
 116         stream->recursive =
 117             krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_RECURSIVE);
 118         stream->keep_snaps = keep_snaps;
 119 
 120         rc = copy_str(stream->dataset, dataset, sizeof (stream->dataset));
 121         if (rc != 0 || is_str_empty(dataset)) {
 122                 krrp_error_set(error, KRRP_ERRNO_SRCDS, EINVAL);
 123                 goto err;
 124         }
 125 
 126         /* Source and Common snapshots must not be equal */
 127         if (base_snap_name != NULL && incr_snap_name != NULL &&
 128             strcmp(base_snap_name, incr_snap_name) == 0) {
 129                 krrp_error_set(error, KRRP_ERRNO_STREAM, EINVAL);
 130                 goto err;
 131         }
 132 
 133         /*
 134          * If base_snap_name is defined then the stream will be non_continuous,
 135          * that means: only one task will be processed
 136          */
 137         if (base_snap_name != NULL) {
 138                 stream->non_continuous = B_TRUE;
 139 
 140                 rc = copy_str(stream->base_snap_name, base_snap_name,
 141                     sizeof (stream->base_snap_name));
 142                 if (rc != 0 || is_str_empty(base_snap_name)) {
 143                         krrp_error_set(error, KRRP_ERRNO_SRCSNAP, EINVAL);
 144                         goto err;
 145                 }
 146 
 147                 if (resume_token != NULL) {
 148                         rc = dmu_krrp_decode_resume_token(resume_token,
 149                             &stream->resume_info);
 150                         if (rc != 0) {
 151                                 krrp_error_set(error,
 152                                     KRRP_ERRNO_RESUMETOKEN, rc);
 153                                 goto err;
 154                         }
 155                 }
 156         }
 157 
 158         if (incr_snap_name != NULL) {
 159                 rc = copy_str(stream->incr_snap_name, incr_snap_name,
 160                     sizeof (stream->incr_snap_name));
 161                 if (rc != 0 || is_str_empty(incr_snap_name)) {
 162                         krrp_error_set(error, KRRP_ERRNO_CMNSNAP, EINVAL);
 163                         goto err;
 164                 }
 165         }
 166 
 167         rc = krrp_stream_te_read_create(&stream->task_engine,
 168             stream->dataset, flags, &krrp_stream_check_mem,
 169             stream, skip_snaps_mask, error);
 170         if (rc != 0)
 171                 goto err;
 172 
 173         krrp_autosnap_rside_create(&stream->autosnap,
 174             stream->keep_snaps, stream->dataset, stream->recursive);
 175 
 176         *result_stream = stream;
 177 
 178         return (0);
 179 
 180 err:
 181         krrp_stream_destroy(stream);
 182 
 183         return (-1);
 184 }
 185 
 186 int
 187 krrp_stream_write_create(krrp_stream_t **result_stream,
 188     size_t keep_snaps, const char *dataset, const char *incr_snap_name,
 189     const char *resume_token, krrp_stream_write_flag_t flags,
 190     nvlist_t *ignore_props_list, nvlist_t *replace_props_list,
 191     krrp_error_t *error)
 192 {
 193         krrp_stream_t *stream;
 194         int rc;
 195 
 196         VERIFY(result_stream != NULL && *result_stream == NULL);
 197         VERIFY(dataset != NULL);
 198 
 199         stream = krrp_stream_common_create();
 200 
 201         stream->mode = KRRP_STRMM_WRITE;
 202         stream->keep_snaps = keep_snaps;
 203 
 204         rc = copy_str(stream->dataset, dataset, sizeof (stream->dataset));
 205         if (rc != 0 || is_str_empty(dataset)) {
 206                 krrp_error_set(error, KRRP_ERRNO_DSTDS, EINVAL);
 207                 goto err;
 208         }
 209 
 210         if (resume_token != NULL) {
 211                 rc = dmu_krrp_decode_resume_token(resume_token,
 212                     &stream->resume_info);
 213                 if (rc != 0) {
 214                         krrp_error_set(error,
 215                             KRRP_ERRNO_RESUMETOKEN, rc);
 216                         goto err;
 217                 }
 218         }
 219 
 220         if (incr_snap_name != NULL) {
 221                 rc = copy_str(stream->incr_snap_name, incr_snap_name,
 222                     sizeof (stream->incr_snap_name));
 223                 if (rc != 0 || is_str_empty(incr_snap_name)) {
 224                         krrp_error_set(error, KRRP_ERRNO_CMNSNAP, EINVAL);
 225                         goto err;
 226                 }
 227         }
 228 
 229         rc = krrp_stream_te_write_create(&stream->task_engine,
 230             stream->dataset, flags, ignore_props_list,
 231             replace_props_list, error);
 232         if (rc != 0)
 233                 goto err;
 234 
 235         krrp_autosnap_wside_create(&stream->autosnap,
 236             stream->keep_snaps, stream->dataset);
 237 
 238         *result_stream = stream;
 239 
 240         return (0);
 241 
 242 err:
 243         krrp_stream_destroy(stream);
 244 
 245         return (-1);
 246 }
 247 
 248 int
 249 krrp_stream_fake_read_create(krrp_stream_t **result_stream,
 250     uint64_t fake_data_sz, krrp_error_t *error)
 251 {
 252         int rc = -1;
 253         krrp_stream_t *stream;
 254 
 255         VERIFY(result_stream != NULL && *result_stream == NULL);
 256 
 257         if (fake_data_sz == 0) {
 258                 krrp_error_set(error, KRRP_ERRNO_FAKEDSZ, EINVAL);
 259                 goto out;
 260         }
 261 
 262         stream = krrp_stream_common_create();
 263 
 264         stream->mode = KRRP_STRMM_READ;
 265         stream->fake_mode = B_TRUE;
 266         stream->non_continuous = B_TRUE;
 267         stream->fake_data_sz = fake_data_sz;
 268 
 269         /* Fake never fails */
 270         VERIFY(krrp_stream_te_fake_read_create(&stream->task_engine,
 271             error) == 0);
 272 
 273         *result_stream = stream;
 274         rc = 0;
 275 
 276 out:
 277         return (rc);
 278 }
 279 
 280 int
 281 krrp_stream_fake_write_create(krrp_stream_t **result_stream,
 282     krrp_error_t *error)
 283 {
 284         krrp_stream_t *stream;
 285 
 286         VERIFY(result_stream != NULL && *result_stream == NULL);
 287 
 288         stream = krrp_stream_common_create();
 289 
 290         stream->mode = KRRP_STRMM_WRITE;
 291         stream->fake_mode = B_TRUE;
 292         stream->non_continuous = B_TRUE;
 293 
 294         /* Fake never fails */
 295         VERIFY(krrp_stream_te_fake_write_create(&stream->task_engine,
 296             error) == 0);
 297 
 298         *result_stream = stream;
 299 
 300         return (0);
 301 }
 302 
 303 static krrp_stream_t *
 304 krrp_stream_common_create(void)
 305 {
 306         krrp_stream_t *stream;
 307 
 308         stream = kmem_zalloc(sizeof (krrp_stream_t), KM_SLEEP);
 309 
 310         mutex_init(&stream->mtx, NULL, MUTEX_DEFAULT, NULL);
 311         cv_init(&stream->cv, NULL, CV_DEFAULT, NULL);
 312 
 313         stream->state = KRRP_STRMS_CREATED;
 314 
 315         return (stream);
 316 }
 317 
 318 void
 319 krrp_stream_destroy(krrp_stream_t *stream)
 320 {
 321         krrp_stream_lock(stream);
 322 
 323         stream->state = KRRP_STRMS_STOPPED;
 324         while (stream->work_thread != NULL)
 325                 krrp_stream_cv_wait(stream);
 326 
 327         if (stream->autosnap != NULL)
 328                 krrp_autosnap_destroy(stream->autosnap);
 329 
 330         if (stream->task_engine != NULL)
 331                 krrp_stream_te_destroy(stream->task_engine);
 332 
 333         if (stream->resume_info != NULL)
 334                 fnvlist_free(stream->resume_info);
 335 
 336         krrp_stream_unlock(stream);
 337 
 338         cv_destroy(&stream->cv);
 339         mutex_destroy(&stream->mtx);
 340 
 341         kmem_free(stream, sizeof (krrp_stream_t));
 342 }
 343 
 344 static boolean_t
 345 krrp_stream_check_mem(size_t required_mem, void *void_stream)
 346 {
 347         krrp_stream_t *stream = void_stream;
 348         size_t available_mem =
 349             krrp_pdu_engine_get_free_mem(stream->data_pdu_engine);
 350 
 351         if (available_mem < required_mem)
 352                 return (B_FALSE);
 353 
 354         return (B_TRUE);
 355 }
 356 
 357 void
 358 krrp_stream_register_callback(krrp_stream_t *stream,
 359     krrp_stream_cb_t *ev_cb, void *ev_cb_arg)
 360 {
 361         VERIFY(ev_cb != NULL);
 362 
 363         krrp_stream_lock(stream);
 364         VERIFY(stream->state == KRRP_STRMS_CREATED);
 365 
 366         stream->state = KRRP_STRMS_READY_TO_RUN;
 367         stream->callback = ev_cb;
 368         stream->callback_arg = ev_cb_arg;
 369 
 370         krrp_stream_unlock(stream);
 371 }
 372 
 373 int
 374 krrp_stream_run(krrp_stream_t *stream, krrp_queue_t *write_data_queue,
 375     krrp_pdu_engine_t *data_pdu_engine, krrp_error_t *error)
 376 {
 377         int rc = -1;
 378 
 379         VERIFY(data_pdu_engine != NULL);
 380         VERIFY(data_pdu_engine->type == KRRP_PET_DATA);
 381         VERIFY(write_data_queue != NULL);
 382 
 383         krrp_stream_lock(stream);
 384         VERIFY(stream->state == KRRP_STRMS_READY_TO_RUN);
 385 
 386         stream->data_pdu_engine = data_pdu_engine;
 387         stream->write_data_queue = write_data_queue;
 388 
 389         if (!stream->fake_mode) {
 390                 if (krrp_stream_validate_run(stream, error) != 0)
 391                         goto out;
 392 
 393                 if (krrp_stream_activate_autosnap(stream, error) != 0)
 394                         goto out;
 395         }
 396 
 397         /* thread_create never fails */
 398         switch (stream->mode) {
 399         case KRRP_STRMM_READ:
 400                 if (stream->fake_mode)
 401                         krrp_stream_fake_read_task_init(stream->task_engine,
 402                             stream->fake_data_sz);
 403 
 404                 stream->work_thread = thread_create(NULL, 0, &krrp_stream_read,
 405                     stream, 0, &p0, TS_RUN, minclsyspri);
 406                 break;
 407         case KRRP_STRMM_WRITE:
 408                 stream->work_thread = thread_create(NULL, 0, &krrp_stream_write,
 409                     stream, 0, &p0, TS_RUN, minclsyspri);
 410                 break;
 411         }
 412 
 413         while (stream->state == KRRP_STRMS_READY_TO_RUN)
 414                 krrp_stream_cv_wait(stream);
 415 
 416         rc = 0;
 417 
 418 out:
 419         krrp_stream_unlock(stream);
 420         return (rc);
 421 }
 422 
 423 void
 424 krrp_stream_stop(krrp_stream_t *stream)
 425 {
 426         krrp_stream_lock(stream);
 427         VERIFY(stream->state == KRRP_STRMS_ACTIVE ||
 428             stream->state == KRRP_STRMS_IN_ERROR);
 429 
 430         stream->state = KRRP_STRMS_STOPPED;
 431         krrp_stream_cv_broadcast(stream);
 432 
 433         if (!stream->non_continuous)
 434                 krrp_autosnap_deactivate(stream->autosnap);
 435 
 436         krrp_stream_unlock(stream);
 437 }
 438 
 439 int
 440 krrp_stream_send_stop(krrp_stream_t *stream)
 441 {
 442         int rc = -1;
 443 
 444         ASSERT(stream->mode == KRRP_STRMM_READ);
 445         ASSERT(!stream->non_continuous);
 446 
 447         if (stream->notify_txg == UINT64_MAX) {
 448                 krrp_autosnap_create_snapshot(stream->autosnap);
 449 
 450                 /*
 451                  * To deactivate autosnap-logic need to be sure
 452                  * that an autosnap has been created
 453                  *
 454                  * Autosnap-service may delay creation of snapshot,
 455                  * so here we may wait for some time (1-2 transactions)
 456                  */
 457                 krrp_stream_lock(stream);
 458 
 459                 stream->wait_for_snap = B_TRUE;
 460                 while (stream->wait_for_snap &&
 461                     stream->state == KRRP_STRMS_ACTIVE)
 462                         krrp_stream_cv_wait(stream);
 463 
 464                 krrp_stream_unlock(stream);
 465 
 466                 krrp_autosnap_deactivate(stream->autosnap);
 467 
 468                 rc = 0;
 469         }
 470 
 471         return (rc);
 472 }
 473 
 474 static int
 475 krrp_stream_validate_run(krrp_stream_t *stream, krrp_error_t *error)
 476 {
 477         int rc = -1;
 478 
 479         /*
 480          * The SOURCE datasets must exist always
 481          *
 482          * The DESTINATION dataset may not exist, that is allowed,
 483          * but if incr_snap_name is defined therefore this replication
 484          * is incremental and destination dataset must exist, exclude
 485          * the cases if the destination dataset is a ZFS pool (the name
 486          * does not contain '/'). For all other cases the parent of
 487          * the destination dataset must exist.
 488          */
 489         if (dsl_dataset_creation_txg(stream->dataset) == UINT64_MAX) {
 490                 if (stream->mode == KRRP_STRMM_READ) {
 491                         krrp_error_set(error, KRRP_ERRNO_SRCDS, ENOENT);
 492                         goto out;
 493                 } else {
 494                         char *p;
 495                         boolean_t parent_exists = B_TRUE;
 496                         boolean_t dst_is_pool = B_TRUE;
 497 
 498                         p = strrchr(stream->dataset, '/');
 499                         if (p != NULL) {
 500                                 *p = '\0';
 501                                 parent_exists =
 502                                     dsl_dataset_creation_txg(stream->dataset) !=
 503                                     UINT64_MAX;
 504                                 *p = '/';
 505                                 dst_is_pool = B_FALSE;
 506                         }
 507 
 508                         if (dst_is_pool || !parent_exists ||
 509                             strlen(stream->incr_snap_name) != 0) {
 510                                 krrp_error_set(error, KRRP_ERRNO_DSTDS, ENOENT);
 511                                 goto out;
 512                         }
 513                 }
 514         }
 515 
 516         rc = 0;
 517         if (stream->mode == KRRP_STRMM_READ && !stream->non_continuous) {
 518                 rc = wbc_check_dataset(stream->dataset);
 519                 if (rc == 0 || rc == ENOTACTIVE)
 520                         rc = 0;
 521                 else
 522                         krrp_error_set(error, KRRP_ERRNO_STREAM, rc);
 523         }
 524 
 525 out:
 526         return (rc);
 527 }
 528 
 529 static int
 530 krrp_stream_activate_autosnap(krrp_stream_t *stream,
 531     krrp_error_t *error)
 532 {
 533         uint64_t incr_snap_txg = UINT64_MAX;
 534         int rc = -1;
 535 
 536         if (strlen(stream->incr_snap_name) != 0) {
 537                 incr_snap_txg = krrp_stream_get_snap_txg(stream,
 538                     stream->incr_snap_name);
 539                 if (incr_snap_txg == UINT64_MAX) {
 540                         krrp_error_set(error, KRRP_ERRNO_CMNSNAP, ENOENT);
 541                         goto out;
 542                 }
 543         }
 544 
 545         switch (stream->mode) {
 546         case KRRP_STRMM_READ:
 547                 if (!stream->non_continuous) {
 548                         rc = krrp_autosnap_activate(stream->autosnap, incr_snap_txg,
 549                             &krrp_stream_read_snap_confirm_cb,
 550                             &krrp_stream_read_snap_notify_cb,
 551                             &krrp_stream_snap_create_error_cb,
 552                             &krrp_stream_autosnap_restore_cb,
 553                             stream, error);
 554                         if (rc != 0)
 555                                 goto out;
 556 
 557                         /*
 558                          * Autosnap does snapshots only in case of I/O
 559                          * to the dataset, so if user has some data on
 560                          * the dataset, but does not have I/O the available
 561                          * data will not be replicated. To exclude this case
 562                          * need to ask autosnap to create snapshot
 563                          */
 564                         krrp_autosnap_create_snapshot(stream->autosnap);
 565                 } else {
 566                         uint64_t base_snap_txg;
 567 
 568                         VERIFY(stream->base_snap_name[0] != '\0');
 569 
 570                         base_snap_txg = krrp_stream_get_snap_txg(stream,
 571                             stream->base_snap_name);
 572                         if (base_snap_txg == UINT64_MAX) {
 573                                 krrp_error_set(error,
 574                                     KRRP_ERRNO_SRCSNAP, ENOENT);
 575                                 goto out;
 576                         }
 577 
 578                         krrp_stream_read_task_init(stream->task_engine,
 579                             base_snap_txg, stream->base_snap_name,
 580                             stream->incr_snap_name, stream->resume_info);
 581 
 582                         /*
 583                          * Non-continuous replication sends only one snapshot.
 584                          * We remember TXG of this snapshot and will notify
 585                          * userspace that the snapshot successfully received
 586                          */
 587                         stream->notify_txg = base_snap_txg;
 588                 }
 589 
 590                 break;
 591         case KRRP_STRMM_WRITE:
 592                 rc = krrp_autosnap_activate(stream->autosnap, incr_snap_txg,
 593                     NULL, &krrp_stream_write_snap_notify_cb,
 594                     &krrp_stream_snap_create_error_cb, NULL, stream, error);
 595                 if (rc != 0)
 596                         goto out;
 597 
 598                 break;
 599         }
 600 
 601         rc = 0;
 602 
 603 out:
 604         return (rc);
 605 }
 606 
 607 static void
 608 krrp_stream_autosnap_restore_cb(void *void_stream,
 609     const char *snap_name, uint64_t txg)
 610 {
 611         krrp_stream_t *stream = void_stream;
 612 
 613         VERIFY(stream->mode == KRRP_STRMM_READ);
 614 
 615         krrp_stream_read_task_init(stream->task_engine, txg, snap_name,
 616             stream->incr_snap_name, NULL);
 617 
 618         (void) strlcpy(stream->incr_snap_name, snap_name,
 619             sizeof (stream->incr_snap_name));
 620 }
 621 
 622 #if 0
 623 static void
 624 krrp_stream_debug(const char *msg, void *arg1, void *arg2,
 625     void *arg3, void *arg4)
 626 {
 627         cmn_err(CE_PANIC, "Debug");
 628 }
 629 #endif
 630 
 631 /*
 632  * This function is called after a TXG confirmation
 633  * has been received from the receiver
 634  *
 635  * There are two stages of receiving:
 636  * - complete recv into krrp-buffers (complete == B_FALSE)
 637  * - complete recv into ZFS (complete == B_TRUE)
 638  */
 639 void
 640 krrp_stream_txg_confirmed(krrp_stream_t *stream, uint64_t txg,
 641     boolean_t complete)
 642 {
 643         krrp_stream_task_t *task;
 644 
 645         if (complete) {
 646                 DTRACE_PROBE1(krrp_txg_ack2, uint64_t, txg);
 647 
 648                 /* autosnap is used only by CDP sender */
 649                 if (!stream->non_continuous)
 650                         krrp_autosnap_txg_rele(stream->autosnap,
 651                             txg, AUTOSNAP_NO_SNAP);
 652 
 653                 stream->last_full_ack_txg = txg;
 654 
 655                 task = krrp_queue_get(stream->task_engine->tasks_done2);
 656                 krrp_stream_calc_avg_rpo(stream, task, B_TRUE);
 657                 krrp_stream_task_fini(task);
 658 
 659                 if (stream->notify_txg == txg) {
 660                         krrp_stream_callback(stream,
 661                             KRRP_STREAM_SEND_DONE, NULL);
 662                 }
 663 
 664                 return;
 665         }
 666 
 667         stream->last_ack_txg = txg;
 668 
 669         task = krrp_queue_get(stream->task_engine->tasks_done);
 670         ASSERT(task != NULL);
 671 
 672         krrp_stream_calc_avg_rpo(stream, task, B_FALSE);
 673         krrp_queue_put(stream->task_engine->tasks_done2, task);
 674 }
 675 
 676 static void
 677 krrp_stream_calc_avg_rpo(krrp_stream_t *stream, krrp_stream_task_t *task,
 678     boolean_t complete)
 679 {
 680         size_t i, avg_cnt;
 681         uint64_t sum;
 682         krrp_txg_rpo_t *avg_rpo;
 683 
 684         if (complete)
 685                 avg_rpo = &stream->avg_total_rpo;
 686         else
 687                 avg_rpo = &stream->avg_rpo;
 688 
 689         avg_rpo->buf[avg_rpo->cnt] = krrp_stream_task_calc_rpo(task);
 690         avg_rpo->cnt++;
 691         avg_rpo->cnt %= 10;
 692 
 693         sum = 0;
 694         avg_cnt = 10;
 695         for (i = 0; i < 10; i++) {
 696                 sum += avg_rpo->buf[i];
 697                 if (avg_rpo->buf[i] == 0)
 698                         avg_cnt--;
 699         }
 700 
 701         /* Average value in ms */
 702         avg_rpo->value = sum / avg_cnt / 1000 / 1000;
 703 }
 704 
 705 /* ARGSUSED */
 706 static void
 707 krrp_stream_snap_create_error_cb(const char *snap_name, int err,
 708     uint64_t txg, void *void_stream)
 709 {
 710         krrp_stream_t *stream;
 711         krrp_error_t error;
 712         boolean_t just_return = B_FALSE;
 713 
 714         stream = void_stream;
 715 
 716         krrp_stream_lock(stream);
 717 
 718         if (stream->state == KRRP_STRMS_ACTIVE)
 719                 stream->state = KRRP_STRMS_IN_ERROR;
 720         else
 721                 just_return = B_TRUE;
 722 
 723         if (stream->wait_for_snap) {
 724                 stream->wait_for_snap = B_FALSE;
 725                 krrp_stream_cv_signal(stream);
 726         }
 727 
 728         krrp_stream_unlock(stream);
 729 
 730         if (just_return)
 731                 return;
 732 
 733         krrp_error_set(&error, KRRP_ERRNO_SNAPFAIL, err);
 734         krrp_stream_callback(stream, KRRP_STREAM_ERROR,
 735             (uintptr_t)&error);
 736 }
 737 
 738 /* ARGSUSED */
 739 static boolean_t
 740 krrp_stream_read_snap_confirm_cb(const char *snap_name, boolean_t recursive,
 741     uint64_t txg, void *void_stream)
 742 {
 743         krrp_stream_t *stream = void_stream;
 744         boolean_t result = B_FALSE;
 745         size_t krrp_max_num_read_tasks =
 746             stream->keep_snaps + krrp_add_num_read_tasks;
 747 
 748         if (krrp_autosnap_try_hold_to_confirm(stream->autosnap)) {
 749                 size_t tasks =
 750                     krrp_stream_te_total_num_tasks(stream->task_engine);
 751                 if (tasks < krrp_max_num_read_tasks || stream->wait_for_snap)
 752                         result = B_TRUE;
 753                 else
 754                         result = B_FALSE;
 755 
 756                 krrp_autosnap_unhold(stream->autosnap);
 757         }
 758 
 759         return (result);
 760 }
 761 
 762 /*
 763  * Autosnap snap_created callback for the Receiver
 764  */
 765 /* ARGSUSED */
 766 static boolean_t
 767 krrp_stream_write_snap_notify_cb(const char *snap_name, boolean_t recursive,
 768     boolean_t autosnap, uint64_t txg, uint64_t unused, void *void_stream)
 769 {
 770         krrp_stream_t *stream;
 771 
 772         stream = void_stream;
 773 
 774         krrp_stream_lock(stream);
 775 
 776         if (stream->cur_task == NULL) {
 777                 krrp_stream_unlock(stream);
 778                 return (B_FALSE);
 779         }
 780 
 781         if (stream->cur_task->txg_start == UINT64_MAX) {
 782                 stream->cur_task->txg_start = txg;
 783                 stream->cur_task->txg_end = AUTOSNAP_NO_SNAP;
 784         } else {
 785                 stream->cur_task->txg_end = txg;
 786         }
 787 
 788         krrp_stream_unlock(stream);
 789 
 790         return (B_TRUE);
 791 }
 792 
 793 /*
 794  * Autosnap snap_created callback for the Sender
 795  */
 796 /* ARGSUSED */
 797 static boolean_t
 798 krrp_stream_read_snap_notify_cb(const char *snap_name, boolean_t recursive,
 799     boolean_t autosnap, uint64_t txg, uint64_t unused, void *void_stream)
 800 {
 801         krrp_stream_t *stream = void_stream;
 802         boolean_t result = B_FALSE;
 803         size_t krrp_max_num_read_tasks =
 804             stream->keep_snaps + krrp_add_num_read_tasks;
 805 
 806         if (krrp_autosnap_try_hold_to_confirm(stream->autosnap)) {
 807                 size_t tasks =
 808                     krrp_stream_te_total_num_tasks(stream->task_engine);
 809                 if (tasks < krrp_max_num_read_tasks || stream->wait_for_snap) {
 810                         uint64_t cur_snap_txg;
 811                         char *cur_snap_name;
 812 
 813                         cur_snap_name = strchr(snap_name, '@');
 814                         ASSERT(cur_snap_name != NULL);
 815                         cur_snap_name++;
 816                         cur_snap_txg = txg;
 817 
 818                         krrp_stream_read_task_init(stream->task_engine,
 819                             cur_snap_txg, cur_snap_name,
 820                             stream->incr_snap_name, NULL);
 821 
 822                         (void) strlcpy(stream->incr_snap_name, cur_snap_name,
 823                             sizeof (stream->incr_snap_name));
 824 
 825                         result = B_TRUE;
 826 
 827                         if (stream->wait_for_snap) {
 828                                 stream->wait_for_snap = B_FALSE;
 829 
 830                                 /*
 831                                  * This snapshot is the last that we will send.
 832                                  * We remember its TXG and will notify userspace
 833                                  * that the snapshot successfully received
 834                                  */
 835                                 stream->notify_txg = txg;
 836 
 837                                 krrp_stream_lock(stream);
 838                                 krrp_stream_cv_signal(stream);
 839                                 krrp_stream_unlock(stream);
 840                         }
 841                 }
 842 
 843                 krrp_autosnap_unhold(stream->autosnap);
 844         }
 845 
 846         return (result);
 847 }
 848 
 849 /*
 850  * The handler for READ STREAM
 851  *
 852  * Stream tasks are created by Autosnaper and pushed to tasks-queue
 853  */
 854 static void
 855 krrp_stream_read(void *arg)
 856 {
 857         krrp_stream_t *stream = arg;
 858 
 859         krrp_stream_task_t *stream_task = NULL;
 860         int rc;
 861         krrp_pdu_data_t *pdu = NULL;
 862 
 863         VERIFY(stream->data_pdu_engine != NULL);
 864 
 865         krrp_stream_lock(stream);
 866         stream->state = KRRP_STRMS_ACTIVE;
 867         krrp_stream_cv_signal(stream);
 868 
 869         while (stream->state == KRRP_STRMS_ACTIVE) {
 870                 krrp_stream_unlock(stream);
 871 
 872                 if (pdu == NULL) {
 873                         DTRACE_PROBE(krrp_pdu_data_alloc_start);
 874 
 875                         krrp_pdu_alloc(stream->data_pdu_engine,
 876                             (krrp_pdu_t **)&pdu, KRRP_PDU_WITH_HDR);
 877 
 878                         DTRACE_PROBE(krrp_pdu_data_alloc_stop);
 879 
 880                         if (pdu == NULL) {
 881                                 krrp_stream_lock(stream);
 882                                 continue;
 883                         }
 884                 }
 885 
 886                 if (stream_task == NULL) {
 887                         krrp_stream_task_engine_get_task(stream->task_engine,
 888                             &stream_task);
 889                         if (stream_task == NULL) {
 890                                 krrp_stream_lock(stream);
 891                                 continue;
 892                         }
 893 
 894                         stream->cur_task = stream_task;
 895                         stream->cur_send_txg = stream_task->txg;
 896                         stream->cur_pdu = pdu;
 897 
 898                         stream_task->start(stream_task);
 899 
 900                         pdu->initial = B_TRUE;
 901 
 902                         DTRACE_PROBE1(krrp_stream_task_io_start, uint64_t,
 903                             stream_task->txg);
 904                 }
 905 
 906                 rc = stream_task->process(stream_task, pdu);
 907 
 908                 if (rc != 0) {
 909                         krrp_pdu_rele((krrp_pdu_t *)pdu);
 910                         pdu = NULL;
 911                         stream->cur_pdu = NULL;
 912 
 913                         krrp_stream_lock(stream);
 914                         if (stream->state == KRRP_STRMS_ACTIVE) {
 915                                 krrp_error_t error;
 916 
 917                                 stream->state = KRRP_STRMS_IN_ERROR;
 918                                 krrp_stream_unlock(stream);
 919 
 920                                 krrp_error_set(&error, KRRP_ERRNO_READFAIL, rc);
 921                                 krrp_stream_callback(stream, KRRP_STREAM_ERROR,
 922                                     (uintptr_t)&error);
 923 
 924                                 krrp_stream_lock(stream);
 925                         }
 926 
 927                         break;
 928                 }
 929 
 930                 if (stream_task->done) {
 931                         DTRACE_PROBE1(krrp_stream_task_io_stop, uint64_t,
 932                             stream_task->txg);
 933 
 934                         krrp_stream_task_done(stream, stream_task, B_FALSE);
 935                         stream_task = NULL;
 936                         stream->cur_task = NULL;
 937 
 938                         stream->last_send_txg = stream->cur_send_txg;
 939                         stream->cur_send_txg = 0;
 940                 }
 941 
 942                 stream->bytes_processed += pdu->cur_data_sz;
 943                 krrp_stream_callback(stream, KRRP_STREAM_DATA_PDU,
 944                     (uintptr_t)pdu);
 945 
 946                 pdu = NULL;
 947                 stream->cur_pdu = NULL;
 948                 krrp_stream_lock(stream);
 949         } /* while() loop */
 950 
 951         stream->cur_task = NULL;
 952         krrp_stream_unlock(stream);
 953 
 954         if (pdu != NULL)
 955                 krrp_pdu_rele((krrp_pdu_t *)pdu);
 956 
 957         if (stream_task != NULL)
 958                 krrp_stream_task_done(stream, stream_task, B_TRUE);
 959 
 960         krrp_stream_lock(stream);
 961 
 962         stream->work_thread = NULL;
 963 
 964         krrp_stream_cv_broadcast(stream);
 965         krrp_stream_unlock(stream);
 966         thread_exit();
 967 }
 968 
 969 /*
 970  * The handler for WRITE STREAM
 971  *
 972  * Stream tasks are created on intial PDU
 973  */
 974 static void
 975 krrp_stream_write(void *arg)
 976 {
 977         krrp_stream_t *stream = arg;
 978 
 979         krrp_pdu_data_t *pdu = NULL;
 980         krrp_stream_task_t *stream_task = NULL;
 981         int rc;
 982 
 983         VERIFY(stream->write_data_queue != NULL);
 984 
 985 #ifdef KRRP_STREAM_DEBUG
 986         krrp_queue_init(&stream->debug_pdu_queue, sizeof (krrp_pdu_t),
 987             offsetof(krrp_pdu_t, node));
 988 #endif
 989 
 990         krrp_stream_lock(stream);
 991         stream->state = KRRP_STRMS_ACTIVE;
 992         krrp_stream_cv_signal(stream);
 993 
 994         while (stream->state == KRRP_STRMS_ACTIVE) {
 995                 krrp_stream_unlock(stream);
 996 
 997                 if (pdu == NULL) {
 998                         pdu = krrp_queue_get(stream->write_data_queue);
 999                         if (pdu == NULL) {
1000                                 krrp_stream_lock(stream);
1001                                 continue;
1002                         }
1003                 }
1004 
1005                 if (pdu->initial) {
1006                         /* Replace by ASSERT */
1007                         VERIFY(stream_task == NULL);
1008 
1009                         krrp_stream_write_task_init(stream->task_engine,
1010                             pdu->txg, &stream_task, stream->resume_info);
1011 
1012                         /*
1013                          * Cookies are required only for
1014                          * the first received task
1015                          */
1016                         if (stream->resume_info != NULL) {
1017                                 fnvlist_free(stream->resume_info);
1018                                 stream->resume_info = NULL;
1019                         }
1020 
1021                         stream->cur_task = stream_task;
1022                         stream->cur_recv_txg = pdu->txg;
1023 
1024                         DTRACE_PROBE1(krrp_stream_task_io_start, uint64_t,
1025                             stream_task->txg);
1026                 }
1027 
1028                 stream->cur_pdu = pdu;
1029 
1030                 rc = stream_task->process(stream_task, pdu);
1031                 if (rc != 0) {
1032                         krrp_stream_lock(stream);
1033                         if (stream->state == KRRP_STRMS_ACTIVE) {
1034                                 krrp_error_t error;
1035 
1036                                 stream->state = KRRP_STRMS_IN_ERROR;
1037                                 krrp_stream_unlock(stream);
1038 
1039                                 krrp_error_set(&error,
1040                                     KRRP_ERRNO_WRITEFAIL, rc);
1041                                 krrp_stream_callback(stream, KRRP_STREAM_ERROR,
1042                                     (uintptr_t)&error);
1043                                 krrp_stream_lock(stream);
1044                         }
1045 
1046                         break;
1047                 }
1048 
1049                 stream->bytes_processed += pdu->cur_data_sz;
1050                 if (stream_task->done) {
1051                         VERIFY(pdu->final == B_TRUE);
1052 
1053                         DTRACE_PROBE1(krrp_stream_task_io_stop, uint64_t,
1054                             stream_task->txg);
1055 
1056                         krrp_stream_task_done(stream, stream_task, B_FALSE);
1057                         stream_task = NULL;
1058                         stream->cur_task = NULL;
1059                         stream->cur_recv_txg = 0;
1060                 }
1061 
1062 #ifdef KRRP_STREAM_DEBUG
1063                 krrp_queue_put(stream->debug_pdu_queue, pdu);
1064 
1065                 if (krrp_queue_length(stream->debug_pdu_queue) > 1) {
1066                         pdu = krrp_queue_get(stream->debug_pdu_queue);
1067                         krrp_pdu_rele((krrp_pdu_t *)pdu);
1068                 }
1069 #else
1070                 krrp_pdu_rele((krrp_pdu_t *)pdu);
1071 #endif
1072 
1073                 pdu = NULL;
1074                 krrp_stream_lock(stream);
1075         } /* while() loop */
1076 
1077         stream->cur_task = NULL;
1078         krrp_stream_unlock(stream);
1079 
1080         if (pdu != NULL)
1081                 krrp_pdu_rele((krrp_pdu_t *)pdu);
1082 
1083 #ifdef KRRP_STREAM_DEBUG
1084         /* Simple get, because this queue without locks */
1085         while ((pdu = krrp_queue_get(stream->debug_pdu_queue)) != NULL)
1086                 krrp_pdu_rele((krrp_pdu_t *)pdu);
1087 
1088         krrp_queue_fini(stream->debug_pdu_queue);
1089 #endif
1090 
1091         if (stream_task != NULL)
1092                 krrp_stream_task_done(stream, stream_task, B_TRUE);
1093 
1094         krrp_stream_lock(stream);
1095 
1096         stream->work_thread = NULL;
1097 
1098         krrp_stream_cv_broadcast(stream);
1099         krrp_stream_unlock(stream);
1100         thread_exit();
1101 }
1102 
1103 static void
1104 krrp_stream_task_done(krrp_stream_t *stream,
1105     krrp_stream_task_t *task, boolean_t only_fini)
1106 {
1107         task->shutdown(task);
1108 
1109         if (only_fini) {
1110                 krrp_stream_task_fini(task);
1111                 return;
1112         }
1113 
1114         switch (stream->mode) {
1115         case KRRP_STRMM_READ:
1116                 if (krrp_stream_te_num_pending_tasks(stream->task_engine) == 0) {
1117                         if (stream->do_ctrl_snap) {
1118                                 krrp_autosnap_create_snapshot(stream->autosnap);
1119                                 stream->do_ctrl_snap = B_FALSE;
1120                         }
1121                 } else {
1122                         stream->do_ctrl_snap = B_TRUE;
1123                 }
1124 
1125                 krrp_queue_put(stream->task_engine->tasks_done, task);
1126                 break;
1127         case KRRP_STRMM_WRITE:
1128                 krrp_autosnap_txg_rele(stream->autosnap,
1129                     task->txg_start, task->txg_end);
1130 
1131                 krrp_stream_callback(stream, KRRP_STREAM_TXG_RECV_DONE,
1132                     (uintptr_t)task->txg);
1133                 krrp_stream_task_fini(task);
1134                 break;
1135         }
1136 }
1137 
1138 static void
1139 krrp_stream_callback(krrp_stream_t *stream,
1140     krrp_stream_cb_ev_t ev, uintptr_t ev_arg)
1141 {
1142         stream->callback(ev, ev_arg, stream->callback_arg);
1143 }
1144 
1145 static uint64_t
1146 krrp_stream_get_snap_txg(krrp_stream_t *stream,
1147     const char *short_snap_name)
1148 {
1149         char full_ds_name[MAXNAMELEN];
1150 
1151         (void) snprintf(full_ds_name, sizeof (full_ds_name), "%s@%s",
1152             stream->dataset, short_snap_name);
1153 
1154         return (dsl_dataset_creation_txg(full_ds_name));
1155 }
1156 
1157 boolean_t
1158 krrp_stream_is_write_flag_set(krrp_stream_write_flag_t flags,
1159     krrp_stream_write_flag_t flag)
1160 {
1161         return ((flags & flag) != 0);
1162 }
1163 
1164 void
1165 krrp_stream_set_write_flag(krrp_stream_write_flag_t *flags,
1166     krrp_stream_write_flag_t flag)
1167 {
1168         *flags |= flag;
1169 }
1170 
1171 boolean_t
1172 krrp_stream_is_read_flag_set(krrp_stream_read_flag_t flags,
1173     krrp_stream_read_flag_t flag)
1174 {
1175         return ((flags & flag) != 0);
1176 }
1177 
1178 void
1179 krrp_stream_set_read_flag(krrp_stream_read_flag_t *flags,
1180     krrp_stream_read_flag_t flag)
1181 {
1182         *flags |= flag;
1183 }